Result Handling Guide
Learn how to download, verify, and use the parsing results from Knowhere API.
Getting the Result URL
When a job completes successfully, the response includes a result_url:
{
"job_id": "job_abc123",
"status": "done",
"result_url": "https://results.knowhereto.ai/result_job_abc123.zip?...",
"result_url_expires_at": "2025-01-16T10:32:45Z",
"result_checksum": {
"algorithm": "sha256",
"value": "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
}
}
Download and Extract
- Python
- Node.js
import requests
import zipfile
import json
import hashlib
from io import BytesIO
from pathlib import Path
def download_and_extract(job: dict, output_dir: str = "./results") -> dict:
"""Download, verify, and extract job results."""
# Download ZIP
print("Downloading results...")
response = requests.get(job["result_url"])
response.raise_for_status()
zip_data = response.content
# Verify checksum
print("Verifying integrity...")
actual_checksum = hashlib.sha256(zip_data).hexdigest()
expected_checksum = job["result_checksum"]["value"]
if actual_checksum != expected_checksum:
raise ValueError(
f"Checksum mismatch!\n"
f"Expected: {expected_checksum}\n"
f"Actual: {actual_checksum}"
)
# Extract
print("Extracting...")
output_path = Path(output_dir) / job["job_id"]
output_path.mkdir(parents=True, exist_ok=True)
with zipfile.ZipFile(BytesIO(zip_data)) as zf:
zf.extractall(output_path)
# Load chunks
with open(output_path / "chunks.json") as f:
chunks_data = json.load(f)
print(f"Extracted {len(chunks_data['chunks'])} chunks to {output_path}")
return chunks_data
# Usage
chunks = download_and_extract(job)
import AdmZip from 'adm-zip';
import crypto from 'crypto';
import fs from 'fs';
import path from 'path';
async function downloadAndExtract(job, outputDir = './results') {
// Download ZIP
console.log('Downloading results...');
const response = await fetch(job.result_url);
const buffer = Buffer.from(await response.arrayBuffer());
// Verify checksum
console.log('Verifying integrity...');
const actualChecksum = crypto.createHash('sha256').update(buffer).digest('hex');
const expectedChecksum = job.result_checksum.value;
if (actualChecksum !== expectedChecksum) {
throw new Error(
`Checksum mismatch!\n` +
`Expected: ${expectedChecksum}\n` +
`Actual: ${actualChecksum}`
);
}
// Extract
console.log('Extracting...');
const outputPath = path.join(outputDir, job.job_id);
fs.mkdirSync(outputPath, { recursive: true });
const zip = new AdmZip(buffer);
zip.extractAllTo(outputPath, true);
// Load chunks
const chunksJson = fs.readFileSync(path.join(outputPath, 'chunks.json'), 'utf-8');
const chunksData = JSON.parse(chunksJson);
console.log(`Extracted ${chunksData.chunks.length} chunks to ${outputPath}`);
return chunksData;
}
// Usage
const chunks = await downloadAndExtract(job);
Working with Chunks
Basic Iteration
- Python
- Node.js
for chunk in chunks_data["chunks"]:
print(f"ID: {chunk['chunk_id']}")
print(f"Type: {chunk['type']}")
print(f"Path: {chunk['path']}")
print(f"Content: {chunk['content'][:200]}...")
print("---")
for (const chunk of chunksData.chunks) {
console.log(`ID: ${chunk.chunk_id}`);
console.log(`Type: ${chunk.type}`);
console.log(`Path: ${chunk.path}`);
console.log(`Content: ${chunk.content.slice(0, 200)}...`);
console.log('---');
}
Filter by Type
- Python
- Node.js
# Get only text chunks
text_chunks = [c for c in chunks_data["chunks"] if c["type"] == "text"]
# Get only tables
table_chunks = [c for c in chunks_data["chunks"] if c["type"] == "table"]
# Get only images
image_chunks = [c for c in chunks_data["chunks"] if c["type"] == "image"]
print(f"Text: {len(text_chunks)}, Tables: {len(table_chunks)}, Images: {len(image_chunks)}")
// Get only text chunks
const textChunks = chunksData.chunks.filter(c => c.type === 'text');
// Get only tables
const tableChunks = chunksData.chunks.filter(c => c.type === 'table');
// Get only images
const imageChunks = chunksData.chunks.filter(c => c.type === 'image');
console.log(`Text: ${textChunks.length}, Tables: ${tableChunks.length}, Images: ${imageChunks.length}`);
Filter by Path
- Python
- Node.js
# Get chunks from a specific section
executive_summary = [
c for c in chunks_data["chunks"]
if "Executive Summary" in c["path"]
]
# Get all top-level sections
top_level = [
c for c in chunks_data["chunks"]
if c["path"].count("/") == 0
]
// Get chunks from a specific section
const executiveSummary = chunksData.chunks.filter(
c => c.path.includes('Executive Summary')
);
// Get all top-level sections
const topLevel = chunksData.chunks.filter(
c => !c.path.includes('/')
);
Access Metadata
- Python
- Node.js
for chunk in chunks_data["chunks"]:
metadata = chunk["metadata"]
if chunk["type"] == "text":
print(f"Keywords: {metadata.get('keywords', [])}")
print(f"Tokens: {metadata.get('tokens')}")
print(f"Summary: {metadata.get('summary')}")
elif chunk["type"] == "image":
print(f"File: {metadata['file_path']}")
print(f"Alt text: {metadata.get('alt_text')}")
elif chunk["type"] == "table":
print(f"File: {metadata['file_path']}")
print(f"Type: {metadata.get('table_type')}")
for (const chunk of chunksData.chunks) {
const { metadata } = chunk;
if (chunk.type === 'text') {
console.log(`Keywords: ${metadata.keywords || []}`);
console.log(`Tokens: ${metadata.tokens}`);
console.log(`Summary: ${metadata.summary}`);
} else if (chunk.type === 'image') {
console.log(`File: ${metadata.file_path}`);
console.log(`Alt text: ${metadata.alt_text}`);
} else if (chunk.type === 'table') {
console.log(`File: ${metadata.file_path}`);
console.log(`Type: ${metadata.table_type}`);
}
}
Accessing Images and Tables
Images and tables are stored as separate files in the ZIP package.
Load an Image
- Python
- Node.js
from PIL import Image
from pathlib import Path
def load_image(extract_dir: str, chunk: dict) -> Image:
"""Load an image from an image chunk."""
if chunk["type"] != "image":
raise ValueError("Not an image chunk")
file_path = Path(extract_dir) / chunk["metadata"]["file_path"]
return Image.open(file_path)
# Usage
for chunk in image_chunks:
img = load_image("./results/job_abc123", chunk)
print(f"Image size: {img.size}")
import sharp from 'sharp';
import path from 'path';
async function loadImage(extractDir, chunk) {
if (chunk.type !== 'image') {
throw new Error('Not an image chunk');
}
const filePath = path.join(extractDir, chunk.metadata.file_path);
const image = sharp(filePath);
const metadata = await image.metadata();
return { image, metadata };
}
// Usage
for (const chunk of imageChunks) {
const { metadata } = await loadImage('./results/job_abc123', chunk);
console.log(`Image size: ${metadata.width}x${metadata.height}`);
}
Load a Table as HTML
- Python
- Node.js
from pathlib import Path
def load_table_html(extract_dir: str, chunk: dict) -> str:
"""Load a table's HTML content."""
if chunk["type"] != "table":
raise ValueError("Not a table chunk")
file_path = Path(extract_dir) / chunk["metadata"]["file_path"]
return file_path.read_text()
# Usage
for chunk in table_chunks:
html = load_table_html("./results/job_abc123", chunk)
print(f"Table HTML: {html[:200]}...")
import fs from 'fs';
import path from 'path';
function loadTableHtml(extractDir, chunk) {
if (chunk.type !== 'table') {
throw new Error('Not a table chunk');
}
const filePath = path.join(extractDir, chunk.metadata.file_path);
return fs.readFileSync(filePath, 'utf-8');
}
// Usage
for (const chunk of tableChunks) {
const html = loadTableHtml('./results/job_abc123', chunk);
console.log(`Table HTML: ${html.slice(0, 200)}...`);
}
Parse Table to DataFrame
import pandas as pd
from pathlib import Path
def table_to_dataframe(extract_dir: str, chunk: dict) -> pd.DataFrame:
"""Convert a table chunk to a pandas DataFrame."""
file_path = Path(extract_dir) / chunk["metadata"]["file_path"]
html = file_path.read_text()
# pandas can read HTML tables
dfs = pd.read_html(html)
return dfs[0] if dfs else pd.DataFrame()
# Usage
for chunk in table_chunks:
df = table_to_dataframe("./results/job_abc123", chunk)
print(df.head())
Building a RAG Pipeline
Here's how to use chunks in a typical RAG application:
- Python
- Node.js
from typing import List
import openai
def chunks_to_embeddings(chunks: List[dict]) -> List[dict]:
"""Generate embeddings for text chunks."""
documents = []
for chunk in chunks:
if chunk["type"] != "text":
continue
# Create embedding
response = openai.embeddings.create(
model="text-embedding-3-small",
input=chunk["content"]
)
documents.append({
"id": chunk["chunk_id"],
"content": chunk["content"],
"embedding": response.data[0].embedding,
"metadata": {
"path": chunk["path"],
"keywords": chunk["metadata"].get("keywords", []),
"summary": chunk["metadata"].get("summary")
}
})
return documents
# Usage
documents = chunks_to_embeddings(chunks_data["chunks"])
# Store in your vector database (Pinecone, Weaviate, etc.)
import OpenAI from 'openai';
const openai = new OpenAI();
async function chunksToEmbeddings(chunks) {
const documents = [];
for (const chunk of chunks) {
if (chunk.type !== 'text') {
continue;
}
// Create embedding
const response = await openai.embeddings.create({
model: 'text-embedding-3-small',
input: chunk.content
});
documents.push({
id: chunk.chunk_id,
content: chunk.content,
embedding: response.data[0].embedding,
metadata: {
path: chunk.path,
keywords: chunk.metadata.keywords || [],
summary: chunk.metadata.summary
}
});
}
return documents;
}
// Usage
const documents = await chunksToEmbeddings(chunksData.chunks);
// Store in your vector database (Pinecone, Weaviate, etc.)
Handling Expired URLs
The result_url is a presigned URL that expires after 1 hour. If expired, simply request a fresh URL by calling the Get Job endpoint again. Result files are retained for 30 days, so you can always get a new URL during this period:
- Python
- Node.js
def get_result_url(job_id: str, api_key: str) -> str:
"""Get a fresh result URL for a completed job."""
response = requests.get(
f"https://api.knowhereto.ai/v1/jobs/{job_id}",
headers={"Authorization": f"Bearer {api_key}"}
)
response.raise_for_status()
job = response.json()
if job["status"] != "done":
raise ValueError(f"Job not complete: {job['status']}")
return job["result_url"]
async function getResultUrl(jobId, apiKey) {
const response = await fetch(
`https://api.knowhereto.ai/v1/jobs/${jobId}`,
{ headers: { 'Authorization': `Bearer ${apiKey}` } }
);
if (!response.ok) {
throw new Error(`Request failed: ${response.status}`);
}
const job = await response.json();
if (job.status !== 'done') {
throw new Error(`Job not complete: ${job.status}`);
}
return job.result_url;
}
Next Steps
- Result Delivery Concepts - Understand the ZIP structure
- Error Handling - Handle download failures