Skip to main content

Python SDK Reference

Complete API reference for the Knowhere Python SDK, including all classes, methods, types, and error types.

Installation

pip install knowhere-python-sdk

Or with uv:

uv add knowhere-python-sdk

Classes

Knowhere

Synchronous client. The primary entry point for most applications.

class Knowhere:
def __init__(
self,
*,
api_key: Optional[str] = None,
base_url: Optional[str] = None,
timeout: Optional[float] = None,
upload_timeout: Optional[float] = None,
max_retries: Optional[int] = None,
default_headers: Optional[Dict[str, str]] = None,
) -> None
ParameterTypeDefaultDescription
api_keystr | NoneKNOWHERE_API_KEY envAPI key (required)
base_urlstr | Nonehttps://api.knowhereto.aiAPI base URL
timeoutfloat | None60.0HTTP request timeout (seconds)
upload_timeoutfloat | None600.0File upload timeout (seconds)
max_retriesint | None5Max retry attempts for retryable errors
default_headersDict[str, str] | NoneNoneExtra headers to include in every request

Properties

PropertyTypeDescription
jobsJobsAccess the jobs resource namespace

Context Manager

with Knowhere(api_key="sk_...") as client:
result = client.parse(url="https://example.com/report.pdf")

AsyncKnowhere

Asynchronous client for high-throughput applications. Same interface as Knowhere, but all methods are async.

class AsyncKnowhere:
def __init__(
self,
*,
api_key: Optional[str] = None,
base_url: Optional[str] = None,
timeout: Optional[float] = None,
upload_timeout: Optional[float] = None,
max_retries: Optional[int] = None,
default_headers: Optional[Dict[str, str]] = None,
) -> None

Accepts the same parameters as Knowhere.

Properties

PropertyTypeDescription
jobsAsyncJobsAccess the async jobs resource namespace

Context Manager

async with AsyncKnowhere(api_key="sk_...") as client:
result = await client.parse(url="https://example.com/report.pdf")

Methods

Knowhere.parse()

Parse a document end-to-end: create job → upload file (if needed) → poll → download and parse results. Provide exactly one of url or file.

def parse(
self,
*,
url: Optional[str] = None,
file: Optional[Union[Path, BinaryIO, bytes]] = None,
file_name: Optional[str] = None,
data_id: Optional[str] = None,
parsing_params: Optional[ParsingParams] = None,
webhook: Optional[WebhookConfig] = None,
poll_interval: float = 10.0,
poll_timeout: float = 1800.0,
verify_checksum: bool = True,
on_upload_progress: Optional[UploadProgressCallback] = None,
on_poll_progress: Optional[PollProgressCallback] = None,
) -> ParseResult
ParameterTypeDefaultDescription
urlstr | NoneNoneURL to parse (mutually exclusive with file)
filePath | BinaryIO | bytes | NoneNoneLocal file to upload and parse
file_namestr | NoneNoneOverride filename (inferred from Path if omitted)
data_idstr | NoneNoneIdempotency / correlation identifier
parsing_paramsParsingParamsNoneParsing configuration
webhookWebhookConfigNoneWebhook for job completion notifications
poll_intervalfloat10.0Seconds between status polls
poll_timeoutfloat1800.0Max seconds to wait for completion
verify_checksumboolTrueVerify SHA-256 checksum of the downloaded result
on_upload_progressCallable | NoneNoneCallback (bytes_sent, total_bytes)
on_poll_progressCallable | NoneNoneCallback (JobResult) on each poll

Returns

Returns a ParseResult containing the manifest, typed chunks, full markdown, and raw ZIP.

Example

from pathlib import Path

# Parse from URL
result = client.parse(url="https://example.com/report.pdf")

# Parse a local file with advanced options
result = client.parse(
file=Path("report.pdf"),
parsing_params={"model": "advanced", "ocr_enabled": True},
poll_interval=5.0,
)

print(result.statistics.total_chunks)
print(result.full_markdown[:500])

Jobs.create()

Create a new parsing job.

def create(
self,
*,
source_type: str,
source_url: Optional[str] = None,
file_name: Optional[str] = None,
data_id: Optional[str] = None,
parsing_params: Optional[ParsingParams] = None,
webhook: Optional[WebhookConfig] = None,
) -> Job
ParameterTypeDefaultDescription
source_typestr"url" or "file"
source_urlstr | NoneNoneURL to parse (required when source_type="url")
file_namestr | NoneNoneOriginal filename (required when source_type="file")
data_idstr | NoneNoneIdempotency / correlation identifier
parsing_paramsParsingParamsNoneParsing configuration
webhookWebhookConfigNoneWebhook for job completion notifications

Returns

A Job object. When source_type="file", the object includes upload_url and upload_headers for the next step.


Jobs.upload()

Upload a file for a job created with source_type="file".

def upload(
self,
job: Union[Job, str],
file: Union[Path, BinaryIO, bytes],
*,
on_progress: Optional[UploadProgressCallback] = None,
) -> None
ParameterTypeDescription
jobJob | strA Job object or a pre-signed upload URL string
filePath | BinaryIO | bytesThe file to upload
on_progressCallable | NoneOptional callback (bytes_sent, total_bytes)

Jobs.get()

Retrieve the current status and result of a job.

def get(self, job_id: str) -> JobResult
ParameterTypeDescription
job_idstrThe job identifier

Returns

A JobResult with the current status, progress, error, and result details.


Jobs.wait()

Poll until the job reaches a terminal status (done or failed).

def wait(
self,
job_id: str,
*,
poll_interval: float = 10.0,
poll_timeout: float = 1800.0,
on_progress: Optional[PollProgressCallback] = None,
) -> JobResult
ParameterTypeDefaultDescription
job_idstrThe job to poll
poll_intervalfloat10.0Seconds between polls (adaptive backoff applied)
poll_timeoutfloat1800.0Max seconds to wait before raising PollingTimeoutError
on_progressCallable | NoneNoneCallback (JobResult) on each poll

Returns

A terminal JobResult. Raises JobFailedError if the job fails, or PollingTimeoutError if the timeout is exceeded.


Jobs.load()

Download and parse the result ZIP for a completed job.

def load(
self,
job_result: Union[JobResult, str],
*,
verify_checksum: bool = True,
) -> ParseResult
ParameterTypeDefaultDescription
job_resultJobResult | strA JobResult with result_url, or a direct URL string
verify_checksumboolTrueVerify the SHA-256 checksum from the manifest

Returns

A fully populated ParseResult.


AsyncJobs

Asynchronous counterpart to Jobs. All methods have the same signatures but are async:

await client.jobs.create(...)
await client.jobs.upload(...)
await client.jobs.get(...)
await client.jobs.wait(...)
await client.jobs.load(...)

Types

Job

Response from POST /v1/jobs — represents a newly created job.

class Job(BaseModel):
job_id: str
status: str
source_type: str
data_id: Optional[str] = None
created_at: Optional[datetime] = None
upload_url: Optional[str] = None
upload_headers: Optional[Dict[str, str]] = None
expires_in: Optional[int] = None
FieldTypeDescription
job_idstrUnique job identifier
statusstrCurrent job status (e.g., waiting-file, pending)
source_typestr"url" or "file"
data_idstr | NoneCaller-provided correlation identifier
created_atdatetime | NoneJob creation timestamp
upload_urlstr | NonePre-signed upload URL (file mode only)
upload_headersDict[str, str] | NoneHeaders to include in the upload request
expires_inint | NoneUpload URL expiry in seconds

JobResult

Response from GET /v1/jobs/{job_id} — full job status and result.

class JobResult(BaseModel):
job_id: str
status: str
source_type: str
data_id: Optional[str] = None
created_at: Optional[datetime] = None
progress: Optional[Union[float, JobProgress]] = None
error: Optional[JobError] = None
result: Optional[Dict[str, Any]] = None
result_url: Optional[str] = None
result_url_expires_at: Optional[datetime] = None
file_name: Optional[str] = None
file_extension: Optional[str] = None
model: Optional[str] = None
ocr_enabled: Optional[bool] = None
duration_seconds: Optional[float] = None
credits_spent: Optional[float] = None
FieldTypeDescription
job_idstrUnique job identifier
statusstrwaiting-file, pending, running, done, or failed
progressfloat | JobProgress | NoneParsing progress (when running)
errorJobErrorError details (when failed)
result_urlstr | NonePre-signed download URL for the result ZIP
result_url_expires_atdatetime | NoneWhen the download URL expires
credits_spentfloat | NoneCredits consumed by this job

Convenience Properties

PropertyTypeDescription
is_terminalboolTrue if status is done or failed
is_doneboolTrue if status is done
is_failedboolTrue if status is failed

JobError

Embedded error object returned when a job fails. Shares the same structure as HTTP error responses.

class JobError(BaseModel):
code: str
message: str
request_id: Optional[str] = None
details: Optional[Any] = None
FieldTypeDescription
codestrCanonical error code (e.g., INVALID_ARGUMENT)
messagestrHuman-readable error message
request_idstr | NoneTracing identifier (equals job_id for job errors)
detailsAny | NoneAdditional error context (violations, retry hints)

JobProgress

Progress info returned by the server during parsing.

class JobProgress(BaseModel):
total_pages: int = 0
processed_pages: int = 0
PropertyTypeDescription
fractionfloatProgress as a 0.0–1.0 fraction

ParseResult

Eagerly-loaded result of a document parsing job. Contains the manifest, all chunks (with image bytes and table HTML already loaded), the full markdown, and the raw ZIP bytes.

class ParseResult:
manifest: Manifest
chunks: List[Chunk]
full_markdown: str
hierarchy: Optional[Any]
raw_zip: bytes
PropertyTypeDescription
manifestManifestMetadata about the parsed document
chunksList[Chunk]All chunks (text, image, table)
full_markdownstrComplete markdown of the document
hierarchyAny | NoneStructural hierarchy data
raw_zipbytesRaw ZIP archive for archival

Convenience Properties

PropertyTypeDescription
text_chunksList[TextChunk]Only text chunks
image_chunksList[ImageChunk]Only image chunks
table_chunksList[TableChunk]Only table chunks
job_idstr | NoneShortcut to manifest.job_id
statisticsStatisticsShortcut to manifest.statistics

Methods

MethodReturnsDescription
getChunk(chunk_id)Chunk | NoneFind a chunk by ID
save(directory)PathSave all results to disk (full.md, images/, tables/, result.zip)

Manifest

Top-level manifest describing the result ZIP contents.

class Manifest(BaseModel):
version: Optional[str] = None
job_id: Optional[str] = None
data_id: Optional[str] = None
source_file_name: Optional[str] = None
processing_date: Optional[str] = None
checksum: Optional[Checksum] = None
statistics: Optional[Statistics] = None
files: Optional[FileIndex] = None

Statistics

Aggregate statistics about the parsed document.

class Statistics(BaseModel):
total_chunks: Optional[int] = 0
text_chunks: Optional[int] = 0
image_chunks: Optional[int] = 0
table_chunks: Optional[int] = 0
total_pages: Optional[int] = 0

Chunk Types

TextChunk

class TextChunk(BaseChunk):
type: str = "text"
length: int = 0
tokens: Optional[int] = None
keywords: Optional[List[str]] = None
summary: Optional[str] = None
relationships: Optional[List[Union[Dict[str, Any], str]]] = None

ImageChunk

class ImageChunk(BaseChunk):
type: str = "image"
file_path: Optional[str] = None
original_name: Optional[str] = None
summary: Optional[str] = None
data: bytes # raw image bytes from ZIP
MethodReturnsDescription
save(directory)PathWrite image bytes to disk
format (property)str | NoneInferred format from file_path extension

TableChunk

class TableChunk(BaseChunk):
type: str = "table"
file_path: Optional[str] = None
original_name: Optional[str] = None
table_type: Optional[str] = None
summary: Optional[str] = None
html: str # table HTML from ZIP
MethodReturnsDescription
save(directory)PathWrite table HTML to disk

All chunks share these base fields:

class BaseChunk(BaseModel):
chunk_id: str
type: str
content: str = ""
path: Optional[str] = None

ParsingParams

Optional parsing parameters for job creation.

class ParsingParams(TypedDict, total=False):
model: str
ocr_enabled: bool
kb_dir: str
doc_type: str
smart_title_parse: bool
summary_image: bool
summary_table: bool
summary_txt: bool
add_frag_desc: bool
FieldTypeDescription
modelstrParsing model to use (e.g., "advanced")
ocr_enabledboolEnable OCR for scanned documents
doc_typestrDocument type hint
smart_title_parseboolEnable smart title detection
summary_imageboolGenerate image summaries
summary_tableboolGenerate table summaries
summary_txtboolGenerate text chunk summaries
add_frag_descboolAdd fragment descriptions

WebhookConfig

Webhook configuration for job completion notifications.

class WebhookConfig(TypedDict, total=False):
url: str
FieldTypeDescription
urlstrHTTPS URL to receive job completion callbacks

Error Types

All SDK errors inherit from KnowhereError:


KnowhereError

Root exception for every error raised by the SDK.

class KnowhereError(Exception):
message: str

APIStatusError

Raised for HTTP 4xx / 5xx responses. The base class for all HTTP status errors.

class APIStatusError(KnowhereError):
status_code: int
code: str
request_id: Optional[str]
details: Optional[Any]
body: Optional[Any]
response: httpx.Response
PropertyTypeDescription
status_codeintHTTP status code
codestrServer error code (e.g., INVALID_ARGUMENT)
request_idstr | NoneRequest tracing identifier
detailsAny | NoneAdditional error context
bodyAny | NoneRaw parsed response body
responsehttpx.ResponseThe underlying HTTP response

RateLimitError

Raised for HTTP 429 — includes rate limit hints from the server.

class RateLimitError(APIStatusError):
retry_after: Optional[float]
limit: Optional[int]
period: Optional[str]
PropertyTypeDescription
retry_afterfloat | NoneSeconds to wait before retrying (None for quota exceeded)
limitint | NoneMaximum allowed requests in the rate window
periodstr | NoneRate window unit ("second", "minute", "hour", "day")

JobFailedError

Raised when polling detects a job has reached the failed status.

class JobFailedError(KnowhereError):
job_result: JobResult
code: str
message: str
PropertyTypeDescription
job_resultJobResultThe full job result object
codestrError code from the job failure
messagestrError message from the job failure

PollingTimeoutError

Raised when jobs.wait() exceeds the configured timeout.

class PollingTimeoutError(KnowhereError):
job_id: str
elapsed: float

ChecksumError

Raised when the SHA-256 checksum of a downloaded result does not match.

class ChecksumError(KnowhereError):
expected: str
actual: str

Other Error Types

ExceptionHTTPDescription
APIConnectionErrorCannot reach the API (DNS, TCP, TLS)
APITimeoutErrorRequest exceeded timeout
ValidationErrorInvalid arguments passed to the SDK
InvalidStateErrorObject in unexpected state (e.g., no upload_url)
BadRequestError400Invalid request parameters
AuthenticationError401Invalid or missing API key
PaymentRequiredError402Insufficient credits
PermissionDeniedError403Access denied
NotFoundError404Resource not found
ConflictError409Concurrency conflict
InternalServerError500Server internal error
ServiceUnavailableError502/503Service temporarily down
GatewayTimeoutError504Gateway timeout

Automatic Retries

The SDK automatically retries requests on retryable errors with exponential backoff and jitter:

ErrorRetried?Condition
Connection errors✅ Always
Timeouts✅ Always
409 Conflict✅ Always
502/503 Unavailable✅ Always
504 Gateway Timeout✅ Always
429 Rate Limit✅ ConditionalOnly when retry_after is present
Other 4xx/5xx❌ Never

Backoff schedule: 0.5s × 2^attempt (capped at 30s), with ±25% jitter. When the server provides a retry_after hint, it takes precedence.

# Reduce retries for faster failure
client = Knowhere(max_retries=2)

# Increase retries for more resilience
client = Knowhere(max_retries=10)

Requirements