Mikshi Python SDK
The Mikshi Python SDK provides a robust interface for interacting with the Mikshi Video Understanding Platform. It simplifies authentication and efficiently processes asynchronous tasks. The SDK wraps the HTTP API in a typed client with Pydantic models for every response.
If you are calling the API directly, see the HTTP API reference instead.
Install
pip install videosdk
Quick start
End-to-end: upload a video, wait for transcoding + HLS, run the analyse and search-process pipelines so the video is queryable, then run a vector search and ask a question about it.
import time
from videosdk import Client
with Client(
api_key="sk_AnFZjh95_...",
base_url="https://api.your-host.com",
) as client:
# 1. Upload — chunks the file, PUTs to S3, completes, and kicks off
# transcoding + HLS generation. Returns immediately with a video_id.
job = client.uploads.upload_file("clip.mp4", name="My Clip")
print(f"uploaded {job.video_id}")
# 2. Wait for upload processing (transcode + HLS) to finish.
while True:
status = client.uploads.get_status(job.video_id)
derived = status.derived_status()
if derived == "completed":
break
if derived == "failed":
raise RuntimeError(status.error_message or "upload processing failed")
time.sleep(2)
# 3. Run analyse + search-process. These two pipelines are what make the
# video searchable and chat-ready — search/visualize/ask all require them.
client.videos.initiate_analyse(job.video_id)
client.videos.initiate_search_process(job.video_id)
while True:
a = client.videos.get_analyse_status(job.video_id)
sp = client.videos.get_search_process_status(job.video_id)
if a.is_terminal() and sp.is_terminal():
if a.is_failed() or sp.is_failed():
raise RuntimeError(f"pipeline failed: analyse={a.status}, search={sp.status}")
break
time.sleep(2)
# 4. Vector search — ranked clip-level hits with pre-signed thumbnails and
# the auth-gated HLS URL for each segment.
hits = client.search.search(
"person near the door",
video_ids=[job.video_id],
top_k=3,
)
for hit in hits:
print(f" #{hit.rank} [{hit.start} - {hit.end}] {hit.hls_playlist_url}")
# 5. Chat — natural-language Q&A over the same video.
result = client.chat.ask(
"Summarize what happens in the video.",
video_id=job.video_id,
)
print(result.answer)
The client is a context manager — use with so the underlying HTTP connection pool is closed cleanly. If you can't, call client.close() yourself.
Client
from videosdk import Client
client = Client(
api_key="sk_...", # required
base_url="https://...", # optional; defaults to http://localhost:8001
timeout=30.0, # optional; per-request timeout in seconds
http_client=None, # optional; pass a pre-configured httpx.Client
)
The client exposes six resource groups:
| Attribute | Resource |
|---|---|
client.uploads | Multipart, direct, YouTube, status |
client.videos | List, get, delete, processing pipelines |
client.collections | Collection CRUD + membership |
client.chat | Natural-language Q&A over a video |
client.search | Vector search + UMAP visualization |
client.embeddings | Raw segment / text embedding vectors |
Authentication is handled automatically — your key is sent as X-API-Key on every request.
Uploads
Four ways to ingest a video, ordered from highest-level to lowest:
upload_file— recommended. Runs the full multipart flow for you.direct— single-shot upload for files ≤ 100 MB.import_youtube— server pulls from a YouTube URL.- Manual multipart — only if you need retry, parallelism, or resume.
Runnable examples in examples/uploads/:
upload_file.py— high-level multipart helper (recommended)upload_direct.py— single-shot uploadupload_youtube.py— server-side YouTube importupload_multipart_manual.py— manual multipart, end-to-endupload_resume.py—list_parts+abort_multipartagainst an in-progress uploadestimate_credits.py— preview the storage credit cost for a file (no network)
| Method | HTTP |
|---|---|
client.uploads.upload_file(path, *, name, ...) | (helper — runs the full multipart flow) |
client.uploads.direct(file, *, name, use_dynamic_hls=False) | POST /api/backend/v1/developer/uploads/direct |
client.uploads.import_youtube(url, name=None) | POST /api/backend/v1/developer/uploads/youtube |
client.uploads.initiate_multipart(filename, content_type) | POST /api/backend/v1/developer/uploads/multipart/initiate |
client.uploads.get_part_url(s3_object_name, upload_id, part_number) | POST /api/backend/v1/developer/uploads/multipart/part-url |
client.uploads.list_parts(s3_object_name, upload_id) | POST /api/backend/v1/developer/uploads/multipart/list-parts |
client.uploads.complete_multipart(s3_object_name, upload_id, parts) | POST /api/backend/v1/developer/uploads/multipart/complete |
client.uploads.abort_multipart(s3_object_name, upload_id) | POST /api/backend/v1/developer/uploads/multipart/abort |
client.uploads.preprocess(s3_object_name, video_id, name, use_dynamic_hls=False) | POST /api/backend/v1/developer/uploads/preprocess |
client.uploads.get_status(video_id) | GET /api/backend/v1/developer/uploads/status/{video_id} |
estimate_upload_credits(size_bytes) (pure helper, top-level import) | (no network — mirrors the server's 1 credit per 25 MiB formula) |
Upload a file (recommended)
For files of any size, upload_file chunks the file, PUTs each chunk to S3, completes the multipart upload, and kicks off processing — all in one call.
job = client.uploads.upload_file("movie.mp4", name="My Movie")
print(job.video_id) # poll get_status(video_id) for processing progress
With progress reporting:
def progress(uploaded, total):
print(f" {uploaded}/{total} bytes ({uploaded * 100 // total}%)")
job = client.uploads.upload_file(
"movie.mp4",
name="My Movie",
chunk_size=25 * 1024 * 1024, # 25 MB chunks (default is 10 MB)
on_progress=progress,
)
What the helper does and doesn't do:
- Sequential chunks (default 10 MB), guesses
content_typefrom the extension,on_progress(uploaded, total)callback. - If anything fails before
complete_multipartsucceeds, the helper callsabort_multipartso you don't pay for orphaned S3 parts. - No retry on failed chunks, no parallel uploads, no resume. For any of those, fall back to the low-level methods below.
- If
preprocessfails after a successful S3 upload, the S3 object is left in place (it's durable at that point). Use the manual flow if you need to stashs3_object_namefor retry.
Direct upload (≤ 100 MB)
job = client.uploads.direct("movie.mp4", name="My Movie")
print(job.video_id)
file accepts a path (str or Path) or an open binary file-like object. Raises PayloadTooLargeError on 413 — switch to upload_file for larger files.
YouTube import
job = client.uploads.import_youtube("https://www.youtube.com/watch?v=dQw4w9WgXcQ")
On failure the SDK raises YouTubeImportError (a subclass of BadRequestError) with the short code on .code. The full set of known codes is exported as YOUTUBE_FATAL_CODES:
from videosdk import YouTubeImportError
try:
client.uploads.import_youtube(url)
except YouTubeImportError as e:
print(f"YouTube import rejected: {e.code}")
if e.code == "YOUTUBE_VIDEO_PRIVATE":
... # skip
The SDK reads the code from whichever field the server uses (message on the dev API, detail on FastAPI defaults), so you don't need to inspect the response body yourself.
Manual multipart (only if the helper doesn't fit)
Use this when you need retry on failed parts, parallel chunk uploads, resume, or custom chunk handling.
import httpx
init = client.uploads.initiate_multipart("movie.mp4", "video/mp4")
parts = []
with open("movie.mp4", "rb") as f:
part_number = 1
while chunk := f.read(10 * 1024 * 1024): # 10 MB chunks
p = client.uploads.get_part_url(init.s3_object_name, init.upload_id, part_number)
r = httpx.put(p.url, content=chunk)
r.raise_for_status()
parts.append({"part_number": part_number, "etag": r.headers["ETag"]})
part_number += 1
client.uploads.complete_multipart(init.s3_object_name, init.upload_id, parts)
job = client.uploads.preprocess(init.s3_object_name, init.video_id, name="My Movie")
Poll status
get_status returns per-pipeline state only (no top-level overall_status). Use derived_status() to roll the three pipelines into a single pending / in_progress / completed / failed state:
import time
while True:
status = client.uploads.get_status(job.video_id)
derived = status.derived_status()
if derived in ("completed", "failed"):
break
print(
f" transcode={getattr(status.transcoding_status, 'percentage', 0)}% "
f"hls={getattr(status.hls_generation_status, 'percentage', 0)}%"
)
time.sleep(2)
if derived == "failed":
raise RuntimeError(status.error_message or "processing failed")
Per-pipeline status values are pending / in_progress / completed / failed.
Videos
Once a video is uploaded, these endpoints manage its lifecycle and run optional processing pipelines (search-process, analyse).
Runnable examples: examples/videos/ — list_videos.py, get_video.py, run_pipeline.py, delete_video.py.
| Method | HTTP |
|---|---|
client.videos.list(*, status=..., start_time=..., end_time=..., not_in_collection=..., skip=..., limit=..., sort_by=..., sort_order=...) | GET /api/backend/v1/developer/videos/list |
client.videos.get(video_id) | GET /api/backend/v1/developer/videos/get-video-details |
client.videos.delete(video_id) | DELETE /api/backend/v1/developer/videos/delete |
client.videos.initiate_search_process(video_id) | POST /api/backend/v1/developer/videos/search-process/initiate |
client.videos.get_search_process_status(video_id) | GET /api/backend/v1/developer/videos/search-process/status/{video_id} |
client.videos.initiate_analyse(video_id) | POST /api/backend/v1/developer/videos/analyse/initiate |
client.videos.get_analyse_status(video_id) | GET /api/backend/v1/developer/videos/analyse/status/{video_id} |
# List with filters (only set what you need; everything is keyword-only)
videos = client.videos.list(status="processed", limit=50, sort_order="asc")
# Single-video details (includes video_metadata)
details = client.videos.get(video_id)
print(details.name, details.video_metadata)
# Kick off the analyse pipeline, then poll
client.videos.initiate_analyse(video_id)
while True:
s = client.videos.get_analyse_status(video_id)
if s.is_terminal():
break
print(f" analyse {s.percentage or 0}%") # percentage is None on DB fallback
time.sleep(2)
# Permanent delete (also removes derived S3 assets)
client.videos.delete(video_id)
status on list is one of uploaded, analysed, search_processed, processed. processed requires both pipelines to be done; the middle two filter on a single pipeline.
The pipeline initiate_* calls return 409 ConflictError if transcoding_status != "completed" — wait for upload processing to finish before kicking these off.
Playing back HLS
details.hls_playlist_path (and the same field on SearchHit, collection videos, etc.) is an auth-gated proxy URL — every manifest and segment fetch the player makes must carry the X-API-Key header. Don't embed the key in client-side code; proxy through your own backend or mint short-lived per-session keys.
See HLS playback in the HTTP reference for player-side wiring (hls.js, Video.js, AVPlayer, ExoPlayer).
Collections
Runnable examples: examples/collections/ — list_collections.py, collections_crud.py.
| Method | HTTP |
|---|---|
client.collections.list(page=1, page_size=10) | GET /api/backend/v1/developer/collections/list |
client.collections.create(name, description=None) | POST /api/backend/v1/developer/collections/create |
client.collections.update(id, *, name=None, description=None) | PATCH /api/backend/v1/developer/collections/update/{id} |
client.collections.delete(id) | DELETE /api/backend/v1/developer/collections/delete/{id} |
client.collections.list_videos(id, *, page=1, page_size=10, sort_by="created_at", sort_order="desc") | GET /api/backend/v1/developer/collections/{id}/videos |
client.collections.add_video(id, video_id) | POST /api/backend/v1/developer/collections/{id}/videos |
client.collections.remove_video(id, video_id) | DELETE /api/backend/v1/developer/collections/{id}/videos/{video_id} |
Notes
listreturns aPaginatedCollectionswith.data,.page,.page_size,.total. Each collection carries up to four signed thumbnail URLs.list_videosreturns aPaginatedCollectionVideoswith the same pagination metadata. Collection videos are always fully processed by add-time constraint.updateonly sends fields you pass. Calling it with all defaults raisesBadRequestErrorfrom the server.add_videorequires a fully-processed video — bothanalyse_status == "DONE"andsearch_process_status == "DONE". The server returns400 BadRequestErrorif the video isn't ready and409 ConflictErrorif it's already in the collection.deleteremoves the collection and its video links — the underlying video records stay intact.
Chat
Natural-language Q&A over a video. Backed by the chat service's retrieval + rerank + LLM pipeline.
Runnable example: examples/chat/ask.py — single-turn ask, with an optional --stream flag for SSE.
| Method | HTTP |
|---|---|
client.chat.ask(question, *, video_id, top_k=None, history=None) | POST /api/chat/v1/developer/chat/ask |
client.chat.ask_stream(question, *, video_id, top_k=None, history=None) | POST /api/chat/v1/developer/chat/ask (SSE, stream=true) |
Notes
video_idis the target video. The caller must own it — unowned IDs raisePermissionDeniedError(403) before credit deduction.- Each call costs 1 credit. If the pipeline fails (5xx), the server automatically refunds the credit before re-raising — you are not billed for failed asks.
- The server is stateless. To maintain a multi-turn conversation, the caller owns the history and resends prior turns on every call.
historyis an OpenAI-style list of{"role": "user"|"assistant", "content": str}dicts (orChatMessageinstances). It is not(user, assistant)tuples.
Single-turn
result = client.chat.ask(
"Describe what happens in this video.",
video_id="a249266f-052d-4148-bc2f-3efa76dc4269",
)
print(result.answer)
Multi-turn
VIDEO_ID = "a249266f-052d-4148-bc2f-3efa76dc4269"
history: list[dict] = []
def turn(question: str) -> str:
result = client.chat.ask(question, video_id=VIDEO_ID, history=history)
history.append({"role": "user", "content": question})
history.append({"role": "assistant", "content": result.answer})
return result.answer
print(turn("Describe what happens in this video."))
print(turn("How many people are involved?"))
print(turn("What was my first question?"))
You can also pass typed ChatMessage instances — the SDK serializes either form to the same wire payload.
top_k
top_k controls how many segments the retriever pulls before rerank/LLM. Omit it to use the server default (5). Bumping it gives the LLM more context but raises latency:
client.chat.ask(
"Walk me through every event in chronological order.",
video_id=VIDEO_ID,
top_k=20,
)
Streaming (SSE)
ask_stream yields one AskTokenEvent per LLM chunk, then either a single AskDoneEvent (full answer attached) or AskErrorEvent (mid-stream failure; credit auto-refunded). Use isinstance to dispatch on the event type:
import sys
from videosdk import AskTokenEvent, AskDoneEvent, AskErrorEvent
for event in client.chat.ask_stream(
"How many people are involved?",
video_id=VIDEO_ID,
):
if isinstance(event, AskTokenEvent):
sys.stdout.write(event.content)
sys.stdout.flush()
elif isinstance(event, AskDoneEvent):
print() # full answer is also on event.answer
elif isinstance(event, AskErrorEvent):
print(f"\n[error] {event.detail}")
break
The stream terminates after the done or error event — the iterator stops on its own.
Search
Two endpoints power semantic search and visualization over segment embeddings. Both run against videos that have completed the analyse and search-process pipelines.
Runnable examples: examples/search/ — search.py, visualize.py.
| Method | HTTP |
|---|---|
client.search.search(query, *, video_ids=None, collection_id=None, top_k=5) | POST /api/chat/v1/developer/search |
client.search.visualize(*, video_ids=None, collection_id=None) | POST /api/chat/v1/developer/visualize |
Notes
- You must provide at least one of
video_idsorcollection_id. Passing neither raisesUnprocessableEntityErrorfrom the server. - The caller must own every video in the request. Unowned IDs raise
PermissionDeniedError(403). searchcosts 1 credit per call (refunded on failure).visualizeis free.visualizeneeds at least 2 segment embeddings across the selection (UMAP requires ≥2 points). Fewer raisesBadRequestError.
Vector search
Returns the top_k ranked clip-level hits. Each SearchHit carries pre-signed thumbnail_url and video_url, plus the auth-gated hls_playlist_url.
hits = client.search.search(
"person walking near the gate",
video_ids=["b2f1...-1111", "b2f1...-2222"],
top_k=5,
)
for hit in hits:
print(
f"#{hit.rank} [{hit.start} - {hit.end}] "
f"video={hit.video_id}\n"
f" hls: {hit.hls_playlist_url}\n"
f" thumb: {hit.thumbnail_url}"
)
You can scope to a collection instead:
hits = client.search.search("forklift incident", collection_id=coll.collection_id)
Or both together — videos and collection are unioned server-side.
Visualization
Returns a 2-D UMAP projection (cosine metric, MinMax-scaled to [0, 1]) of every segment across the requested videos, plus per-video color/HLS/signed URLs and per-segment thumbnails. Useful for plotting an embedding map in a frontend.
viz = client.search.visualize(collection_id="3fa8...")
print(len(viz.videos), "videos,", len(viz.points), "points")
for video_id, info in viz.videos.items():
print(video_id, "color=", info.color)
for p in viz.points[:5]:
print(f" ({p.x:.4f}, {p.y:.4f}) segment={p.segment_id} start={p.start}")
The response shape is:
class VisualizationVideo:
video_id: UUID
color: Optional[str] # e.g. "hsl(217, 70%, 55%)"
hls_playlist_url: Optional[str]
video_url: Optional[str]
class VisualizationPoint:
segment_id: str
video_id: UUID
x: float # 0.0 - 1.0
y: float # 0.0 - 1.0
start: str # "HH:MM:SS"
end: str # "HH:MM:SS"
thumbnail_url: Optional[str]
class Visualization:
videos: Dict[str, VisualizationVideo] # keyed by video_id (str)
points: List[VisualizationPoint]
Embeddings
Two endpoints expose the raw vectors used by search and chat: the stored per-segment vectors for a video (read directly from the vector store, no recompute), and the embedding of an arbitrary text string with the active backend.
Runnable examples: examples/embeddings/ — video_embeddings.py, text_embedding.py.
| Method | HTTP |
|---|---|
client.embeddings.video(video_id) | GET /api/chat/v1/developer/embeddings/video/{video_id} |
client.embeddings.text(text) | POST /api/chat/v1/developer/embeddings/text |
Notes
- Both calls cost 1 credit (refunded on 404 / 408 / 502 / 5xx).
videoreads from the vector store. If the video'ssearch-processpipeline hasn't finished yet, the server returns409and the SDK raisesConflictError— wait untilget_search_process_status(video_id).is_done()and retry.textmust be 1–4096 characters.
Video embeddings
resp = client.embeddings.video("a249266f-052d-4148-bc2f-3efa76dc4269")
print(resp.status, resp.id, len(resp.data), "segments")
for seg in resp.data:
print(
f" [{seg.start} - {seg.end}] "
f"option={seg.embedding_option} scope={seg.embedding_scope} "
f"dims={len(seg.embedding)}"
)
Text embeddings
resp = client.embeddings.text("chain wearing man")
[vector] = resp.data
print(f"dims={len(vector.embedding)} status={resp.status}")
data is a list to keep the wire shape consistent with video, but the text endpoint always returns exactly one vector. Use a destructuring assignment as above if you want a single value.
Partner integrations
The two embedding endpoints make it straightforward to plug the platform's video embeddings into your own vector database. Every integration follows the same two steps:
- Once per video, after
search-processfinishes: callclient.embeddings.video(video_id)and upsert the returned per-segment vectors into your DB. - At query time: call
client.embeddings.text(query)and run a top-K similarity query against your DB with the returned vector.
Each section below is self-contained — both SDK calls (client.embeddings.video and client.embeddings.text) appear inline next to the DB-specific code.
Notes that apply to every DB:
- Vector dimension: 1024 with the current backend.
- Metric: the platform stores vectors normalized for cosine similarity. Configure your DB to match (cosine, or dot-product on normalized vectors).
- Idempotent upserts: generate each row id deterministically from
(video_id, start, end)so re-ingesting the same video is safe. The snippets below useuuid.uuid5for this —uuid.uuid4()would re-create rows every call.
Pinecone
Install: pip install pinecone-client.
import uuid
from pinecone import Pinecone, ServerlessSpec
pc = Pinecone(api_key="...")
pc.create_index(
name="video-segments",
dimension=1024,
metric="cosine",
spec=ServerlessSpec(cloud="aws", region="us-east-1"),
)
index = pc.Index("video-segments")
# 1. Ingest — pull segment embeddings from our SDK, upsert into Pinecone.
resp = client.embeddings.video(video_id)
index.upsert(vectors=[
(
str(uuid.uuid5(uuid.NAMESPACE_URL, f"{resp.id}|{seg.start}|{seg.end}")),
seg.embedding,
{"video_id": str(resp.id), "start": seg.start, "end": seg.end},
)
for seg in resp.data
])
# 2. Query — embed the user's text with our SDK, search Pinecone.
query = client.embeddings.text("man wearing striped shirt")
[qv] = query.data
hits = index.query(vector=qv.embedding, top_k=5, include_metadata=True)
for m in hits.matches:
print(m.score, m.metadata["video_id"], m.metadata["start"])
Qdrant
Install: pip install qdrant-client.
import uuid
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, PointStruct, VectorParams
qd = QdrantClient(url="http://localhost:6333")
qd.recreate_collection(
collection_name="video-segments",
vectors_config=VectorParams(size=1024, distance=Distance.COSINE),
)
# 1. Ingest.
resp = client.embeddings.video(video_id)
qd.upsert(
collection_name="video-segments",
points=[
PointStruct(
id=str(uuid.uuid5(uuid.NAMESPACE_URL, f"{resp.id}|{seg.start}|{seg.end}")),
vector=seg.embedding,
payload={"video_id": str(resp.id), "start": seg.start, "end": seg.end},
)
for seg in resp.data
],
)
# 2. Query.
query = client.embeddings.text("man wearing striped shirt")
[qv] = query.data
hits = qd.search(
collection_name="video-segments",
query_vector=qv.embedding,
limit=5,
with_payload=True,
)
for h in hits:
print(h.score, h.payload["video_id"], h.payload["start"])
Weaviate
Install: pip install weaviate-client.
import weaviate
from weaviate.classes.config import Configure, Property, DataType
from weaviate.classes.query import MetadataQuery
client_wv = weaviate.connect_to_local()
client_wv.collections.create(
name="VideoSegment",
vectorizer_config=Configure.Vectorizer.none(), # we supply our own vectors
properties=[
Property(name="video_id", data_type=DataType.TEXT),
Property(name="start", data_type=DataType.TEXT),
Property(name="end", data_type=DataType.TEXT),
],
)
coll = client_wv.collections.get("VideoSegment")
# 1. Ingest — Weaviate accepts our vector directly via `vector=`.
resp = client.embeddings.video(video_id)
with coll.batch.dynamic() as batch:
for seg in resp.data:
batch.add_object(
properties={
"video_id": str(resp.id),
"start": seg.start,
"end": seg.end,
},
vector=seg.embedding,
)
# 2. Query.
query = client.embeddings.text("man wearing striped shirt")
[qv] = query.data
results = coll.query.near_vector(
near_vector=qv.embedding,
limit=5,
return_metadata=MetadataQuery(distance=True),
)
for obj in results.objects:
print(obj.metadata.distance, obj.properties["video_id"], obj.properties["start"])
Chroma
Install: pip install chromadb.
import uuid
import chromadb
chroma = chromadb.PersistentClient(path="./chroma-db")
coll = chroma.get_or_create_collection(
name="video-segments",
metadata={"hnsw:space": "cosine"},
)
# 1. Ingest.
resp = client.embeddings.video(video_id)
coll.upsert(
ids=[
str(uuid.uuid5(uuid.NAMESPACE_URL, f"{resp.id}|{seg.start}|{seg.end}"))
for seg in resp.data
],
embeddings=[seg.embedding for seg in resp.data],
metadatas=[
{"video_id": str(resp.id), "start": seg.start, "end": seg.end}
for seg in resp.data
],
)
# 2. Query.
query = client.embeddings.text("man wearing striped shirt")
[qv] = query.data
hits = coll.query(query_embeddings=[qv.embedding], n_results=5)
for id_, dist, meta in zip(hits["ids"][0], hits["distances"][0], hits["metadatas"][0]):
print(dist, meta["video_id"], meta["start"])
pgvector (Postgres)
Install: pip install "psycopg[binary]" pgvector.
import uuid
import psycopg
from pgvector.psycopg import register_vector
conn = psycopg.connect("dbname=video")
register_vector(conn)
conn.execute("""
CREATE TABLE IF NOT EXISTS video_segments (
segment_id uuid PRIMARY KEY,
video_id uuid NOT NULL,
start_ts text NOT NULL,
end_ts text NOT NULL,
embedding vector(1024)
);
CREATE INDEX IF NOT EXISTS video_segments_vec_idx
ON video_segments USING hnsw (embedding vector_cosine_ops);
""")
# 1. Ingest.
resp = client.embeddings.video(video_id)
with conn.cursor() as cur:
for seg in resp.data:
cur.execute(
"""
INSERT INTO video_segments (segment_id, video_id, start_ts, end_ts, embedding)
VALUES (%s, %s, %s, %s, %s)
ON CONFLICT (segment_id) DO UPDATE SET embedding = EXCLUDED.embedding
""",
(
uuid.uuid5(uuid.NAMESPACE_URL, f"{resp.id}|{seg.start}|{seg.end}"),
resp.id,
seg.start,
seg.end,
seg.embedding,
),
)
conn.commit()
# 2. Query — `<=>` is cosine distance (smaller = closer).
query = client.embeddings.text("man wearing striped shirt")
[qv] = query.data
with conn.cursor() as cur:
cur.execute(
"""
SELECT segment_id, video_id, start_ts, end_ts, embedding <=> %s AS distance
FROM video_segments
ORDER BY embedding <=> %s
LIMIT 5
""",
(qv.embedding, qv.embedding),
)
for segment_id, video_id, start_ts, end_ts, distance in cur.fetchall():
print(distance, video_id, start_ts)
Resolving a hit back to a playable clip
Whichever DB you use, the payload carries video_id and start — enough to call back into the platform for a signed HLS URL:
def hit_to_playable(video_id: str, start: str):
details = client.videos.get(video_id)
return f"{details.hls_playlist_path}#t={start}"
details.hls_playlist_path is the auth-gated proxy URL — see Playing back HLS for the player wiring.
Errors
All errors inherit from VideoSDKError. HTTP status codes map to specific subclasses so you can catch what matters:
| HTTP | Exception |
|---|---|
| 400 | BadRequestError — YouTubeImportError is a subclass raised by uploads.import_youtube with the short code on .code |
| 401 | AuthenticationError |
| 403 | PermissionDeniedError |
| 404 | NotFoundError |
| 409 | ConflictError |
| 413 | PayloadTooLargeError |
| 422 | UnprocessableEntityError |
| 429 | RateLimitError (sets .retry_after from the Retry-After header) |
| 500 | InternalServerError |
| 502 | BadGatewayError |
| 503 | ServiceUnavailableError |
| other | APIError |
Every exception has .status_code and .response_body attributes.
from videosdk import Client, ConflictError, RateLimitError
import time
try:
client.collections.create(name="already-taken")
except ConflictError:
print("name is taken")
except RateLimitError as e:
if e.retry_after:
time.sleep(e.retry_after)
Models
Every response is parsed into a Pydantic model. Models accept extra fields (extra="allow") so additive server changes won't break your client.
UploadJob
Returned by direct, import_youtube, preprocess, and upload_file. The server's 202 response is just {"video_id": "..."} — poll get_status(video_id) to track processing.
class UploadJob:
video_id: UUID
UploadStatus
Returned by client.uploads.get_status(video_id). Per-pipeline only — there is no top-level overall_status. Call derived_status() to collapse the three pipelines into a single state.
class TaskStatus:
status: str # "pending" | "in_progress" | "completed" | "failed"
percentage: int = 0
error: Optional[str]
class UploadStatus:
video_id: UUID
thumbnail_generation_status: Optional[TaskStatus]
transcoding_status: Optional[TaskStatus]
hls_generation_status: Optional[TaskStatus]
error_message: Optional[str]
def derived_status(self) -> str:
"""Returns 'pending' | 'in_progress' | 'completed' | 'failed' —
'failed' if any pipeline failed, 'completed' if all three did,
'in_progress' if any has started, 'pending' otherwise."""
Multipart helpers
class MultipartInitiate:
upload_id: str
s3_object_name: str
video_id: UUID
class PartUrl:
url: str
part_number: int
class UploadedPart:
part_number: int
etag: str
class CompletedMultipart:
s3_object_name: str
upload_id: str
etag: str
Video
| Field | Type | Notes |
|---|---|---|
video_id | UUID | |
name | str | |
status | Optional[str] | |
video_path | Optional[str] | Pre-signed. |
thumbnail_path | Optional[str] | Pre-signed. |
hls_playlist_path | Optional[str] | Auth-gated proxy URL. |
transcoding_status | Optional[str] | |
hls_generation_status | Optional[str] | |
thumbnail_generation_status | Optional[str] | |
created_at | Optional[datetime] |
VideoListItem / VideoDetails
Returned by client.videos.list and client.videos.get respectively. VideoDetails adds a video_metadata: Optional[Dict[str, Any]] field for codec/duration/dimensions data.
PipelineEnqueued / PipelineStatus
Returned by client.videos.initiate_* and client.videos.get_*_status.
class PipelineEnqueued:
video_id: UUID
status: str # "enqueued"
class PipelineStatus:
video_id: UUID
status: str # see note below — not a fixed enum
percentage: Optional[int] # None on DB fallback (no Redis hit)
error: Optional[str]
def is_done(self) -> bool: ... # status in {"done", "success", "completed"}
def is_failed(self) -> bool: ... # status in {"failed", "error"}
def is_terminal(self) -> bool: ... # is_done() or is_failed()
The two pipelines use different status vocabularies server-side: analyse returns "done" on success, search-process returns "success". Use s.is_done() / s.is_failed() / s.is_terminal() for polling loops instead of comparing the string directly.
Collection
| Field | Type | Notes |
|---|---|---|
collection_id | UUID | |
name | str | |
description | Optional[str] | |
parent_id | Optional[UUID] | The owning user's id. |
parent_type | Optional[str] | Currently always "user". |
created_at | datetime | |
video_count | int | Defaults to 0. |
thumbnails | Optional[List[str]] | Up to four signed thumbnail URLs. |
PaginatedCollections
Returned by client.collections.list.
class PaginatedCollections:
data: List[Collection]
page: int
page_size: int
total: int
PaginatedCollectionVideos
Returned by client.collections.list_videos. data is a list of Video objects, each carrying pre-signed video_path, thumbnail_path, and the auth-gated hls_playlist_path.
class PaginatedCollectionVideos:
data: List[Video]
page: int
page_size: int
total: int
AskResponse
Returned by client.chat.ask.
class AskResponse:
answer: str
The model uses extra="allow" so any future server-side fields (e.g. citations, retrieved segments) will be available via attribute access without an SDK upgrade.
SearchHit
Returned by client.search.search. start / end are HH:MM:SS timestamp strings.
class SearchHit:
rank: int
start: str # "HH:MM:SS"
end: str # "HH:MM:SS"
video_id: UUID
thumbnail_url: Optional[str]
hls_playlist_url: Optional[str]
video_url: Optional[str]
Visualization
Returned by client.search.visualize. See the Search section for full field details.
class VisualizationVideo:
video_id: UUID
color: Optional[str]
hls_playlist_url: Optional[str]
video_url: Optional[str]
class VisualizationPoint:
segment_id: str
video_id: UUID
x: float
y: float
start: str # "HH:MM:SS"
end: str # "HH:MM:SS"
thumbnail_url: Optional[str]
class Visualization:
videos: Dict[str, VisualizationVideo]
points: List[VisualizationPoint]
VideoEmbeddingsResponse
Returned by client.embeddings.video.
class VideoSegmentEmbedding:
embedding: List[float]
embedding_option: Optional[str] # e.g. "visual"
embedding_scope: Optional[str] # e.g. "clip"
start: Optional[str] # "HH:MM:SS"
end: Optional[str] # "HH:MM:SS"
class VideoEmbeddingsResponse:
id: UUID # echo of the requested video_id
status: str # "ready"
data: List[VideoSegmentEmbedding]
TextEmbeddingResponse
Returned by client.embeddings.text. data is a single-element list — the wire shape mirrors VideoEmbeddingsResponse for consistency.
class TextEmbedding:
embedding: List[float]
class TextEmbeddingResponse:
status: str # "ready"
data: List[TextEmbedding]
Recipes
End-to-end workflows that combine multiple SDK calls. Every recipe assumes you've already constructed a Client:
from videosdk import Client
client = Client(api_key="sk_...", base_url="https://api.your-host.com")
Upload recipes
Upload a file and play it back
The full happy path: ingest a file, wait for transcoding + HLS to finish, hand the playlist URL to a player.
import time
def upload_and_wait(path: str, name: str, poll_interval: float = 2.0):
job = client.uploads.upload_file(path, name=name)
print(f"queued {job.video_id}")
while True:
status = client.uploads.get_status(job.video_id)
derived = status.derived_status()
if derived == "completed":
break
if derived == "failed":
raise RuntimeError(f"upload failed: {status.error_message}")
print(
f" transcode={getattr(status.transcoding_status, 'percentage', 0)}% "
f"hls={getattr(status.hls_generation_status, 'percentage', 0)}%"
)
time.sleep(poll_interval)
return client.videos.get(job.video_id)
video = upload_and_wait("clip.mp4", name="My Clip")
print("HLS playlist:", video.hls_playlist_path)
video.hls_playlist_path is the auth-gated proxy URL — see Playing back HLS for the player wiring.
Resumable multipart upload
The upload_file helper aborts on any failure. If you need to survive crashes (e.g. a long upload over a flaky network), drive multipart yourself and persist the upload_id so you can resume from where list_parts says you left off.
import json
import httpx
from pathlib import Path
CHUNK = 10 * 1024 * 1024 # 10 MB
STATE_FILE = Path(".upload-state.json")
def resume_or_start(local_path: str, name: str):
file_path = Path(local_path)
if STATE_FILE.exists():
state = json.loads(STATE_FILE.read_text())
print(f"resuming upload {state['upload_id']}")
else:
init = client.uploads.initiate_multipart(file_path.name, "video/mp4")
state = {
"upload_id": init.upload_id,
"s3_object_name": init.s3_object_name,
"video_id": str(init.video_id),
}
STATE_FILE.write_text(json.dumps(state))
already = client.uploads.list_parts(state["s3_object_name"], state["upload_id"])
done_part_numbers = {p.part_number for p in already}
parts = [p.model_dump() for p in already]
with file_path.open("rb") as f, httpx.Client(timeout=300.0) as s3:
part_number = 1
while chunk := f.read(CHUNK):
if part_number not in done_part_numbers:
p = client.uploads.get_part_url(
state["s3_object_name"], state["upload_id"], part_number
)
r = s3.put(p.url, content=chunk)
r.raise_for_status()
parts.append({"part_number": part_number, "etag": r.headers["ETag"]})
part_number += 1
parts.sort(key=lambda p: p["part_number"])
client.uploads.complete_multipart(
state["s3_object_name"], state["upload_id"], parts
)
job = client.uploads.preprocess(
state["s3_object_name"], state["video_id"], name=name
)
STATE_FILE.unlink()
return job
resume_or_start("big-movie.mp4", "Big Movie")
Real implementations should fseek to skip already-uploaded ranges instead of re-reading every chunk.
YouTube import with error branching
Branch on YouTubeImportError.code so private/age-restricted videos are skipped instead of crashing a batch import.
from videosdk import YouTubeImportError
def safe_youtube_import(url: str):
try:
return client.uploads.import_youtube(url)
except YouTubeImportError as e:
if e.code == "YOUTUBE_VIDEO_PRIVATE":
print("video is private, skipping")
return None
if e.code == "YOUTUBE_VIDEO_AGE_RESTRICTED":
print("age-restricted, skipping")
return None
raise
See YouTube import for the full set of fatal codes (also available as videosdk.YOUTUBE_FATAL_CODES).
Video recipes
Pipeline orchestration: backfill missing analysis
Find every video that's been transcoded but hasn't run through analyse or search-process, and kick off both pipelines.
from concurrent.futures import ThreadPoolExecutor
def backfill_pipelines():
pending = [
v for v in client.videos.list(limit=500)
if v.transcoding_status == "completed"
and (v.analyse_status != "done" or v.search_process_status != "done")
]
print(f"backfilling {len(pending)} videos")
def kick_off(v):
if v.analyse_status != "done":
client.videos.initiate_analyse(v.video_id)
if v.search_process_status != "done":
client.videos.initiate_search_process(v.video_id)
with ThreadPoolExecutor(max_workers=8) as pool:
list(pool.map(kick_off, pending))
return [v.video_id for v in pending]
backfill_pipelines()
Pair with a small polling loop that hits get_analyse_status / get_search_process_status if you want to wait for them all to finish.
Walk videos in a date range
videos.list uses skip/limit (not page). Iterate by incrementing skip until a short page tells you you've reached the end.
from datetime import datetime, timezone, timedelta
def recent_videos(limit_per_call: int = 100):
week_ago = datetime.now(tz=timezone.utc) - timedelta(days=7)
skip = 0
while True:
batch = client.videos.list(
start_time=week_ago,
sort_by="created_at",
sort_order="desc",
skip=skip,
limit=limit_per_call,
)
if not batch:
return
yield from batch
if len(batch) < limit_per_call:
return
skip += limit_per_call
for video in recent_videos():
print(video.video_id, video.name, video.created_at)
Collection recipes
Bulk-upload a folder into a new collection
Create a collection, upload every file, run analyse + search-process (since add_video requires both pipelines DONE), then add each video to the collection.
import time
from pathlib import Path
def wait_pipelines(video_id):
"""Block until both analyse and search-process finish."""
client.videos.initiate_analyse(video_id)
client.videos.initiate_search_process(video_id)
while True:
a = client.videos.get_analyse_status(video_id)
s = client.videos.get_search_process_status(video_id)
if a.is_terminal() and s.is_terminal():
if a.is_failed() or s.is_failed():
raise RuntimeError(f"pipeline failed: analyse={a.status}, search={s.status}")
return
time.sleep(2)
def bulk_upload_to_collection(folder: str, collection_name: str):
coll = client.collections.create(name=collection_name)
print(f"created collection {coll.collection_id}")
for path in sorted(Path(folder).glob("*.mp4")):
video = upload_and_wait(str(path), name=path.stem) # see "Upload a file and play it back"
wait_pipelines(video.video_id)
client.collections.add_video(coll.collection_id, video.video_id)
print(f" added {path.name}")
return coll
bulk_upload_to_collection("./footage", "Warehouse - April")
add_video requires analyse_status == "DONE" AND search_process_status == "DONE". Skipping the pipeline step will get you a 400 BadRequestError. For real workloads, run the uploads concurrently with concurrent.futures.ThreadPoolExecutor — Client is built on httpx.Client, which is thread-safe for separate request calls.
Idempotent create with retry on rate limits
Wrap the create call so duplicate-name conflicts return the existing collection instead of raising, and 429 responses respect Retry-After.
import time
from videosdk import RateLimitError, ConflictError
def with_retries(call, *, retries: int = 5):
for attempt in range(retries):
try:
return call()
except RateLimitError as e:
wait = e.retry_after or (2 ** attempt)
print(f"rate limited, sleeping {wait}s")
time.sleep(wait)
raise RuntimeError("retries exhausted")
def get_or_create_collection(name: str):
try:
return with_retries(lambda: client.collections.create(name=name))
except ConflictError:
page = client.collections.list(page=1, page_size=100)
for c in page.data:
if c.name == name:
return c
raise
Paginate every collection
collections.list is page-based — walk every result by incrementing page until you've yielded total.
def all_collections(page_size: int = 100):
page = 1
yielded = 0
while True:
result = client.collections.list(page=page, page_size=page_size)
if not result.data:
return
for c in result.data:
yield c
yielded += 1
if yielded >= result.total:
return
page += 1
Chat recipes
Multi-turn chat helper
The chat endpoint is stateless — wrap it in a small helper that owns the OpenAI-style history list so callers don't have to.
from videosdk import APIError
class Conversation:
def __init__(self, client, *, video_id: str, top_k: int | None = None):
self.client = client
self.video_id = video_id
self.top_k = top_k
self.history: list[dict] = []
def ask(self, question: str) -> str:
result = self.client.chat.ask(
question,
video_id=self.video_id,
top_k=self.top_k,
history=self.history,
)
self.history.append({"role": "user", "content": question})
self.history.append({"role": "assistant", "content": result.answer})
return result.answer
chat = Conversation(client, video_id="a249266f-052d-4148-bc2f-3efa76dc4269")
print(chat.ask("Describe what happens."))
print(chat.ask("How many people are involved?"))
print(chat.ask("What was my first question?"))
If the pipeline crashes (5xx), the server refunds the credit automatically — but the helper doesn't append to history on failure since an APIError propagates.
Search recipes
Search-then-play
Run a query, then jump the player to the first hit's start time.
hits = client.search.search(
"package being dropped",
collection_id=coll.collection_id,
top_k=10,
)
if not hits:
print("no matches")
else:
top = hits[0]
print(f"open {top.hls_playlist_url}#t={top.start}")
The HLS URL is auth-gated — your player must inject X-API-Key, just like for client.videos.get(video_id).hls_playlist_path. See Playing back HLS.
Visualize a collection's embedding map
Group points by video so a frontend can render each video's segments in its assigned color.
from collections import defaultdict
def grouped_visualization(collection_id):
viz = client.search.visualize(collection_id=collection_id)
by_video = defaultdict(list)
for point in viz.points:
by_video[str(point.video_id)].append(point)
for video_id, points in by_video.items():
info = viz.videos[video_id]
print(f"{video_id} color={info.color} segments={len(points)}")
return viz, by_video
grouped_visualization(coll.collection_id)
visualize requires at least 2 segments across the selection — for very small collections you'll get BadRequestError. Catch it and skip rendering.