Time Workday RaaS Requests
I've had a few scenarios recently where I've needed to time RaaS API requests from Workday for testing and optimizing reports.
This is a somewhat unpolished python script with inline dependencies I can quickly run with uv.
Using the script¶
With uv installed, it's as easy as uv run raas_timer.py.
Run the help command:
Output:
YAML
usage: raas_timer.py [-h] [--user USER] [--count COUNT] [--timeout TIMEOUT] [--download] url
Time a Workday RaaS request.
positional arguments:
url The RaaS URL to request.
options:
-h, --help show this help message and exit
--user USER Workday ISU Username
--count COUNT Number of times to run the request (default=1)
--timeout TIMEOUT Request timeout in minutes (default=1)
--download Save the output as a timestamped CSV file
The script¶
Save this in a file named raas_timer.py on your computer to use.
Python
# /// script
# requires-python = ">=3.11"
# dependencies = [
# "requests",
# "rich",
# "rich-argparse",
# ]
# ///
#!/usr/bin/env python
from __future__ import annotations
import argparse
import datetime
import getpass
import logging
import os
import threading
import time
from collections.abc import Sequence
from pathlib import Path
import requests
from requests.auth import HTTPBasicAuth
from rich.console import Console
from rich.live import Live
from rich.logging import RichHandler
from rich.progress import (
BarColumn,
DownloadColumn,
Progress,
SpinnerColumn,
TextColumn,
TransferSpeedColumn,
)
from rich.spinner import Spinner
from rich.table import Table
from rich.text import Text
from rich_argparse import RichHelpFormatter
# Setup Rich Console and Logging
console = Console()
logging.basicConfig(
level="INFO",
format="%(message)s",
datefmt="[%X]",
handlers=[RichHandler(rich_tracebacks=True, console=console, show_path=False)],
)
log = logging.getLogger("raas_timer")
# --- Utility Functions ---
def format_size(bytes_size: int) -> str:
mb_size = bytes_size / (1024 * 1024)
return f"[bold cyan]{bytes_size:,}[/] bytes ([bold magenta]{mb_size:,.2f} MB[/])"
def get_timer_content(start_time: datetime.datetime, start_ts: str) -> Text:
"""Generates the formatted content for the live waiting timer."""
elapsed = datetime.datetime.now() - start_time
mins, secs = divmod(int(elapsed.total_seconds()), 60)
content = Text()
content.append(start_ts, style="bold cyan")
content.append(" | ", style="white")
content.append("Waiting for Workday...", style="yellow")
content.append(f" {mins}m {secs:02d}s", style="bold white")
return content
# --- Core Logic Splits ---
def wait_for_response(
url: str, auth: HTTPBasicAuth, timeout_min: int
) -> requests.Response:
"""Handles the live timer UI while waiting for the server to respond."""
start_time = datetime.datetime.now()
start_ts = start_time.strftime("%I:%M:%S %p")
response: requests.Response | None = None
error: Exception | None = None
def make_request():
nonlocal response, error
try:
res = requests.get(url, auth=auth, timeout=timeout_min * 60, stream=True)
res.raise_for_status()
response = res
except Exception as e:
error = e
req_thread = threading.Thread(target=make_request, daemon=True)
req_thread.start()
spinner = Spinner("dots", style="cyan")
with Live(console=console, refresh_per_second=10, transient=True) as live:
while req_thread.is_alive():
timer_text = get_timer_content(start_time, start_ts)
renderable = Text.assemble(spinner.render(time.time()), " ", timer_text)
live.update(renderable)
req_thread.join(timeout=0.1)
if error:
raise error
if not response:
raise RuntimeError("Request thread died without response or error.")
return response
def consume_response(response: requests.Response, download: bool) -> tuple[int, int]:
"""Handles the progress bar and processes the incoming data stream."""
total_size = int(response.headers.get("content-length", 0))
line_count = 0
total_bytes = 0
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
DownloadColumn(),
TransferSpeedColumn(),
console=console,
transient=True,
) as progress:
desc = (
"[cyan]Downloading content..." if download else "[cyan]Streaming content..."
)
task_id = progress.add_task(desc, total=total_size if total_size > 0 else None)
if download:
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
filepath = Path(__file__).parent / f"{timestamp}.csv"
with open(filepath, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
progress.update(task_id, advance=len(chunk))
total_bytes = filepath.stat().st_size
with open(filepath, "rb") as f:
for _ in f:
line_count += 1
else:
for line in response.iter_lines():
if line:
line_count += 1
total_bytes += len(line) + 1
progress.update(task_id, advance=len(line) + 1)
return line_count, total_bytes
# --- Orchestrator ---
def run_timing_attempt(
url: str, auth: HTTPBasicAuth, timeout_min: int, download: bool
) -> datetime.timedelta | None:
"""Orchestrates a single request and returns the duration."""
start_time = datetime.datetime.now()
try:
response = wait_for_response(url, auth, timeout_min)
lines, bytes_count = consume_response(response, download)
duration = datetime.datetime.now() - start_time
log.info(f"Lines: [bold white]{lines:,}[/]")
log.info(f"Size: {format_size(bytes_count)}")
log.info(f"Status: [bold green]{response.status_code}[/]")
log.info(f"Duration: [bold yellow]{duration}[/]")
return duration
except Exception as e:
duration = datetime.datetime.now() - start_time
log.error(
f"Request [bold red]FAILED[/] after {duration.total_seconds():.1f}s. Error: {e}"
)
log.warning(f"Cooldown: Sleeping for {timeout_min / 2} minutes...")
time.sleep((timeout_min / 2) * 60)
return None
# --- Entry Point ---
def main(argv: Sequence[str] | None = None) -> int:
parser = argparse.ArgumentParser(
description="Time a Workday RaaS request.", formatter_class=RichHelpFormatter
)
parser.add_argument("url", help="The RaaS URL to request.")
parser.add_argument("--user", help="Workday ISU Username")
parser.add_argument(
"--count", type=int, default=1, help="Number of times to run (default: 1)"
)
parser.add_argument(
"--timeout", type=int, default=1, help="Timeout in minutes (default: 1)"
)
parser.add_argument(
"--download", action="store_true", help="Save the first run as CSV"
)
args = parser.parse_args(argv)
username = args.user or console.input("[bold green]Enter Username:[/] ")
password = getpass.getpass("Enter Password: ")
auth = HTTPBasicAuth(username, password)
results: list[datetime.timedelta | None] = []
for i in range(1, args.count + 1):
console.rule(f"[bold blue]Request {i:02d}/{args.count:02d}[/]")
duration = run_timing_attempt(
url=args.url,
auth=auth,
timeout_min=args.timeout,
download=args.download if i == 1 else False,
)
results.append(duration)
print()
# Summary Table
table = Table(title="\n📊 Execution Summary", title_style="bold underline")
table.add_column("Run #", justify="center")
table.add_column("Duration (Min:Sec)", justify="right")
table.add_column("Status", justify="left")
successes = [d for d in results if d is not None]
for i, delta in enumerate(results, 1):
if delta:
m, s = divmod(int(delta.total_seconds()), 60)
table.add_row(str(i), f"{m}m {s:02d}s", "[green]SUCCESS[/]")
else:
table.add_row(str(i), "N/A", "[red]FAILED[/]")
if successes:
avg_s = sum(d.total_seconds() for d in successes) / len(successes)
m, s = divmod(int(avg_s), 60)
table.caption = f"Average Duration (Successes): [bold yellow]{m}m {s:02d}s[/]"
console.print(table)
return 0
if __name__ == "__main__":
try:
raise SystemExit(main())
except KeyboardInterrupt:
console.print("\n[bold red]Process interrupted by user.[/]")
os._exit(1)