Skip to main content

Polling Best Practices

Learn how to efficiently poll for job completion without wasting resources or hitting rate limits.

Why Polling?

Polling is the simplest way to wait for job completion:

  • No server setup required (unlike webhooks)
  • Works in any environment
  • Easy to implement and debug

Basic Polling

The simplest polling implementation:

import time
import requests

def poll_job(job_id: str, api_key: str) -> dict:
"""Poll until job completes."""
while True:
response = requests.get(
f"https://api.knowhereto.ai/v1/jobs/{job_id}",
headers={"Authorization": f"Bearer {api_key}"}
)
job = response.json()

if job["status"] in ["done", "failed"]:
return job

time.sleep(5) # Wait 5 seconds

Problem: This polls at a fixed rate, which may be too fast for long jobs or too slow for quick ones.

Exponential Backoff

Better approach: start with short intervals and gradually increase:

import time
import requests

def poll_with_backoff(
job_id: str,
api_key: str,
initial_delay: float = 2.0,
max_delay: float = 10.0,
backoff_factor: float = 1.5,
timeout: float = 300.0
) -> dict:
"""Poll with exponential backoff."""

start_time = time.time()
delay = initial_delay

while time.time() - start_time < timeout:
response = requests.get(
f"https://api.knowhereto.ai/v1/jobs/{job_id}",
headers={"Authorization": f"Bearer {api_key}"}
)
job = response.json()

status = job["status"]

if status == "done":
return job
elif status == "failed":
raise Exception(f"Job failed: {job['error']['message']}")

# Log progress
if "progress" in job:
p = job["progress"]
print(f"[{status}] {p['processed_pages']}/{p['total_pages']} pages")
else:
print(f"[{status}] Processing...")

# Wait with backoff
time.sleep(delay)
delay = min(delay * backoff_factor, max_delay)

raise TimeoutError(f"Job did not complete within {timeout} seconds")

Based on typical processing times:

Document SizeInitial DelayMax DelayExpected Time
Small (1-10 pages)2s5s10-30s
Medium (10-50 pages)3s10s30-120s
Large (50+ pages)5s15s2-5 min
def smart_poll(job_id: str, api_key: str, estimated_pages: int = 10) -> dict:
"""Poll with parameters adjusted for document size."""

if estimated_pages <= 10:
return poll_with_backoff(
job_id, api_key,
initial_delay=2.0,
max_delay=5.0,
timeout=60.0
)
elif estimated_pages <= 50:
return poll_with_backoff(
job_id, api_key,
initial_delay=3.0,
max_delay=10.0,
timeout=180.0
)
else:
return poll_with_backoff(
job_id, api_key,
initial_delay=5.0,
max_delay=15.0,
timeout=600.0
)

Handling Rate Limits

The API allows 60 requests per minute. If you hit the limit:

def poll_with_rate_limit_handling(job_id: str, api_key: str) -> dict:
"""Poll with rate limit handling."""

delay = 2.0

while True:
response = requests.get(
f"https://api.knowhereto.ai/v1/jobs/{job_id}",
headers={"Authorization": f"Bearer {api_key}"}
)

# Check for rate limiting
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 60))
print(f"Rate limited. Waiting {retry_after} seconds...")
time.sleep(retry_after)
continue

job = response.json()

if job["status"] in ["done", "failed"]:
return job

# Use rate limit headers to adjust polling
remaining = int(response.headers.get("RateLimit-Remaining", 60))
if remaining < 10:
delay = max(delay, 5.0) # Slow down if running low

time.sleep(delay)
delay = min(delay * 1.5, 10.0)

Progress Tracking

For user-facing applications, show progress:

from tqdm import tqdm

def poll_with_progress_bar(job_id: str, api_key: str) -> dict:
"""Poll with a progress bar."""

pbar = None
delay = 2.0

try:
while True:
response = requests.get(
f"https://api.knowhereto.ai/v1/jobs/{job_id}",
headers={"Authorization": f"Bearer {api_key}"}
)
job = response.json()

if job["status"] == "done":
if pbar:
pbar.close()
return job

elif job["status"] == "failed":
if pbar:
pbar.close()
raise Exception(f"Job failed: {job['error']['message']}")

# Update progress bar
if "progress" in job:
p = job["progress"]
if pbar is None:
pbar = tqdm(total=p["total_pages"], desc="Parsing")
pbar.n = p["processed_pages"]
pbar.refresh()

time.sleep(delay)
delay = min(delay * 1.5, 10.0)

finally:
if pbar:
pbar.close()

Parallel Job Polling

When processing multiple documents, poll all jobs efficiently:

import asyncio
import aiohttp

async def poll_job_async(
session: aiohttp.ClientSession,
job_id: str,
api_key: str
) -> dict:
"""Async polling for a single job."""
delay = 2.0

while True:
async with session.get(
f"https://api.knowhereto.ai/v1/jobs/{job_id}",
headers={"Authorization": f"Bearer {api_key}"}
) as response:
job = await response.json()

if job["status"] in ["done", "failed"]:
return job

await asyncio.sleep(delay)
delay = min(delay * 1.5, 10.0)

async def poll_multiple_jobs(job_ids: list, api_key: str) -> list:
"""Poll multiple jobs concurrently."""
async with aiohttp.ClientSession() as session:
tasks = [
poll_job_async(session, job_id, api_key)
for job_id in job_ids
]
return await asyncio.gather(*tasks)

# Usage
job_ids = ["job_1", "job_2", "job_3"]
results = asyncio.run(poll_multiple_jobs(job_ids, api_key))

Best Practices Summary

  1. Start slow, speed up later: Begin with 2s delay, decrease if job is quick
  2. Use exponential backoff: Don't hammer the API with constant requests
  3. Set timeouts: Always have a maximum wait time
  4. Handle rate limits: Check for 429 responses and respect Retry-After
  5. Show progress: Use progress information for better UX
  6. Log appropriately: Log status changes, not every poll

Next Steps