Build/Python SDKv1

Mikshi Python SDK

The Mikshi Python SDK provides a robust interface for interacting with the Mikshi Video Understanding Platform. It simplifies authentication and efficiently processes asynchronous tasks. The SDK wraps the HTTP API in a typed client with Pydantic models for every response.

If you are calling the API directly, see the HTTP API reference instead.


Install

bash
pip install videosdk

Quick start

End-to-end: upload a video, wait for transcoding + HLS, run the analyse and search-process pipelines so the video is queryable, then run a vector search and ask a question about it.

python
import time
from videosdk import Client

with Client(
    api_key="sk_AnFZjh95_...",
    base_url="https://api.your-host.com",
) as client:
    # 1. Upload — chunks the file, PUTs to S3, completes, and kicks off
    # transcoding + HLS generation. Returns immediately with a video_id.
    job = client.uploads.upload_file("clip.mp4", name="My Clip")
    print(f"uploaded {job.video_id}")

    # 2. Wait for upload processing (transcode + HLS) to finish.
    while True:
        status = client.uploads.get_status(job.video_id)
        derived = status.derived_status()
        if derived == "completed":
            break
        if derived == "failed":
            raise RuntimeError(status.error_message or "upload processing failed")
        time.sleep(2)

    # 3. Run analyse + search-process. These two pipelines are what make the
    # video searchable and chat-ready — search/visualize/ask all require them.
    client.videos.initiate_analyse(job.video_id)
    client.videos.initiate_search_process(job.video_id)
    while True:
        a = client.videos.get_analyse_status(job.video_id)
        sp = client.videos.get_search_process_status(job.video_id)
        if a.is_terminal() and sp.is_terminal():
            if a.is_failed() or sp.is_failed():
                raise RuntimeError(f"pipeline failed: analyse={a.status}, search={sp.status}")
            break
        time.sleep(2)

    # 4. Vector search — ranked clip-level hits with pre-signed thumbnails and
    # the auth-gated HLS URL for each segment.
    hits = client.search.search(
        "person near the door",
        video_ids=[job.video_id],
        top_k=3,
    )
    for hit in hits:
        print(f"  #{hit.rank}  [{hit.start} - {hit.end}]  {hit.hls_playlist_url}")

    # 5. Chat — natural-language Q&A over the same video.
    result = client.chat.ask(
        "Summarize what happens in the video.",
        video_id=job.video_id,
    )
    print(result.answer)

The client is a context manager — use with so the underlying HTTP connection pool is closed cleanly. If you can't, call client.close() yourself.


Client

python
from videosdk import Client

client = Client(
    api_key="sk_...",          # required
    base_url="https://...",    # optional; defaults to http://localhost:8001
    timeout=30.0,              # optional; per-request timeout in seconds
    http_client=None,          # optional; pass a pre-configured httpx.Client
)

The client exposes six resource groups:

AttributeResource
client.uploadsMultipart, direct, YouTube, status
client.videosList, get, delete, processing pipelines
client.collectionsCollection CRUD + membership
client.chatNatural-language Q&A over a video
client.searchVector search + UMAP visualization
client.embeddingsRaw segment / text embedding vectors

Authentication is handled automatically — your key is sent as X-API-Key on every request.


Uploads

Four ways to ingest a video, ordered from highest-level to lowest:

  1. upload_file — recommended. Runs the full multipart flow for you.
  2. direct — single-shot upload for files ≤ 100 MB.
  3. import_youtube — server pulls from a YouTube URL.
  4. Manual multipart — only if you need retry, parallelism, or resume.

Runnable examples in examples/uploads/:

  • upload_file.py — high-level multipart helper (recommended)
  • upload_direct.py — single-shot upload
  • upload_youtube.py — server-side YouTube import
  • upload_multipart_manual.py — manual multipart, end-to-end
  • upload_resume.pylist_parts + abort_multipart against an in-progress upload
  • estimate_credits.py — preview the storage credit cost for a file (no network)
MethodHTTP
client.uploads.upload_file(path, *, name, ...)(helper — runs the full multipart flow)
client.uploads.direct(file, *, name, use_dynamic_hls=False)POST /api/backend/v1/developer/uploads/direct
client.uploads.import_youtube(url, name=None)POST /api/backend/v1/developer/uploads/youtube
client.uploads.initiate_multipart(filename, content_type)POST /api/backend/v1/developer/uploads/multipart/initiate
client.uploads.get_part_url(s3_object_name, upload_id, part_number)POST /api/backend/v1/developer/uploads/multipart/part-url
client.uploads.list_parts(s3_object_name, upload_id)POST /api/backend/v1/developer/uploads/multipart/list-parts
client.uploads.complete_multipart(s3_object_name, upload_id, parts)POST /api/backend/v1/developer/uploads/multipart/complete
client.uploads.abort_multipart(s3_object_name, upload_id)POST /api/backend/v1/developer/uploads/multipart/abort
client.uploads.preprocess(s3_object_name, video_id, name, use_dynamic_hls=False)POST /api/backend/v1/developer/uploads/preprocess
client.uploads.get_status(video_id)GET /api/backend/v1/developer/uploads/status/{video_id}
estimate_upload_credits(size_bytes) (pure helper, top-level import)(no network — mirrors the server's 1 credit per 25 MiB formula)

For files of any size, upload_file chunks the file, PUTs each chunk to S3, completes the multipart upload, and kicks off processing — all in one call.

python
job = client.uploads.upload_file("movie.mp4", name="My Movie")
print(job.video_id)   # poll get_status(video_id) for processing progress

With progress reporting:

python
def progress(uploaded, total):
    print(f"  {uploaded}/{total} bytes ({uploaded * 100 // total}%)")

job = client.uploads.upload_file(
    "movie.mp4",
    name="My Movie",
    chunk_size=25 * 1024 * 1024,  # 25 MB chunks (default is 10 MB)
    on_progress=progress,
)

What the helper does and doesn't do:

  • Sequential chunks (default 10 MB), guesses content_type from the extension, on_progress(uploaded, total) callback.
  • If anything fails before complete_multipart succeeds, the helper calls abort_multipart so you don't pay for orphaned S3 parts.
  • No retry on failed chunks, no parallel uploads, no resume. For any of those, fall back to the low-level methods below.
  • If preprocess fails after a successful S3 upload, the S3 object is left in place (it's durable at that point). Use the manual flow if you need to stash s3_object_name for retry.

Direct upload (≤ 100 MB)

python
job = client.uploads.direct("movie.mp4", name="My Movie")
print(job.video_id)

file accepts a path (str or Path) or an open binary file-like object. Raises PayloadTooLargeError on 413 — switch to upload_file for larger files.

YouTube import

python
job = client.uploads.import_youtube("https://www.youtube.com/watch?v=dQw4w9WgXcQ")

On failure the SDK raises YouTubeImportError (a subclass of BadRequestError) with the short code on .code. The full set of known codes is exported as YOUTUBE_FATAL_CODES:

python
from videosdk import YouTubeImportError

try:
    client.uploads.import_youtube(url)
except YouTubeImportError as e:
    print(f"YouTube import rejected: {e.code}")
    if e.code == "YOUTUBE_VIDEO_PRIVATE":
        ...  # skip

The SDK reads the code from whichever field the server uses (message on the dev API, detail on FastAPI defaults), so you don't need to inspect the response body yourself.

Manual multipart (only if the helper doesn't fit)

Use this when you need retry on failed parts, parallel chunk uploads, resume, or custom chunk handling.

python
import httpx

init = client.uploads.initiate_multipart("movie.mp4", "video/mp4")

parts = []
with open("movie.mp4", "rb") as f:
    part_number = 1
    while chunk := f.read(10 * 1024 * 1024):  # 10 MB chunks
        p = client.uploads.get_part_url(init.s3_object_name, init.upload_id, part_number)
        r = httpx.put(p.url, content=chunk)
        r.raise_for_status()
        parts.append({"part_number": part_number, "etag": r.headers["ETag"]})
        part_number += 1

client.uploads.complete_multipart(init.s3_object_name, init.upload_id, parts)
job = client.uploads.preprocess(init.s3_object_name, init.video_id, name="My Movie")

Poll status

get_status returns per-pipeline state only (no top-level overall_status). Use derived_status() to roll the three pipelines into a single pending / in_progress / completed / failed state:

python
import time

while True:
    status = client.uploads.get_status(job.video_id)
    derived = status.derived_status()
    if derived in ("completed", "failed"):
        break
    print(
        f"  transcode={getattr(status.transcoding_status, 'percentage', 0)}%  "
        f"hls={getattr(status.hls_generation_status, 'percentage', 0)}%"
    )
    time.sleep(2)

if derived == "failed":
    raise RuntimeError(status.error_message or "processing failed")

Per-pipeline status values are pending / in_progress / completed / failed.


Videos

Once a video is uploaded, these endpoints manage its lifecycle and run optional processing pipelines (search-process, analyse).

Runnable examples: examples/videos/list_videos.py, get_video.py, run_pipeline.py, delete_video.py.

MethodHTTP
client.videos.list(*, status=..., start_time=..., end_time=..., not_in_collection=..., skip=..., limit=..., sort_by=..., sort_order=...)GET /api/backend/v1/developer/videos/list
client.videos.get(video_id)GET /api/backend/v1/developer/videos/get-video-details
client.videos.delete(video_id)DELETE /api/backend/v1/developer/videos/delete
client.videos.initiate_search_process(video_id)POST /api/backend/v1/developer/videos/search-process/initiate
client.videos.get_search_process_status(video_id)GET /api/backend/v1/developer/videos/search-process/status/{video_id}
client.videos.initiate_analyse(video_id)POST /api/backend/v1/developer/videos/analyse/initiate
client.videos.get_analyse_status(video_id)GET /api/backend/v1/developer/videos/analyse/status/{video_id}
python
# List with filters (only set what you need; everything is keyword-only)
videos = client.videos.list(status="processed", limit=50, sort_order="asc")

# Single-video details (includes video_metadata)
details = client.videos.get(video_id)
print(details.name, details.video_metadata)

# Kick off the analyse pipeline, then poll
client.videos.initiate_analyse(video_id)
while True:
    s = client.videos.get_analyse_status(video_id)
    if s.is_terminal():
        break
    print(f"  analyse {s.percentage or 0}%")  # percentage is None on DB fallback
    time.sleep(2)

# Permanent delete (also removes derived S3 assets)
client.videos.delete(video_id)

status on list is one of uploaded, analysed, search_processed, processed. processed requires both pipelines to be done; the middle two filter on a single pipeline.

The pipeline initiate_* calls return 409 ConflictError if transcoding_status != "completed" — wait for upload processing to finish before kicking these off.

Playing back HLS

details.hls_playlist_path (and the same field on SearchHit, collection videos, etc.) is an auth-gated proxy URL — every manifest and segment fetch the player makes must carry the X-API-Key header. Don't embed the key in client-side code; proxy through your own backend or mint short-lived per-session keys.

See HLS playback in the HTTP reference for player-side wiring (hls.js, Video.js, AVPlayer, ExoPlayer).


Collections

Runnable examples: examples/collections/list_collections.py, collections_crud.py.

MethodHTTP
client.collections.list(page=1, page_size=10)GET /api/backend/v1/developer/collections/list
client.collections.create(name, description=None)POST /api/backend/v1/developer/collections/create
client.collections.update(id, *, name=None, description=None)PATCH /api/backend/v1/developer/collections/update/{id}
client.collections.delete(id)DELETE /api/backend/v1/developer/collections/delete/{id}
client.collections.list_videos(id, *, page=1, page_size=10, sort_by="created_at", sort_order="desc")GET /api/backend/v1/developer/collections/{id}/videos
client.collections.add_video(id, video_id)POST /api/backend/v1/developer/collections/{id}/videos
client.collections.remove_video(id, video_id)DELETE /api/backend/v1/developer/collections/{id}/videos/{video_id}

Notes

  • list returns a PaginatedCollections with .data, .page, .page_size, .total. Each collection carries up to four signed thumbnail URLs.
  • list_videos returns a PaginatedCollectionVideos with the same pagination metadata. Collection videos are always fully processed by add-time constraint.
  • update only sends fields you pass. Calling it with all defaults raises BadRequestError from the server.
  • add_video requires a fully-processed video — both analyse_status == "DONE" and search_process_status == "DONE". The server returns 400 BadRequestError if the video isn't ready and 409 ConflictError if it's already in the collection.
  • delete removes the collection and its video links — the underlying video records stay intact.

Chat

Natural-language Q&A over a video. Backed by the chat service's retrieval + rerank + LLM pipeline.

Runnable example: examples/chat/ask.py — single-turn ask, with an optional --stream flag for SSE.

MethodHTTP
client.chat.ask(question, *, video_id, top_k=None, history=None)POST /api/chat/v1/developer/chat/ask
client.chat.ask_stream(question, *, video_id, top_k=None, history=None)POST /api/chat/v1/developer/chat/ask (SSE, stream=true)

Notes

  • video_id is the target video. The caller must own it — unowned IDs raise PermissionDeniedError (403) before credit deduction.
  • Each call costs 1 credit. If the pipeline fails (5xx), the server automatically refunds the credit before re-raising — you are not billed for failed asks.
  • The server is stateless. To maintain a multi-turn conversation, the caller owns the history and resends prior turns on every call.
  • history is an OpenAI-style list of {"role": "user"|"assistant", "content": str} dicts (or ChatMessage instances). It is not (user, assistant) tuples.

Single-turn

python
result = client.chat.ask(
    "Describe what happens in this video.",
    video_id="a249266f-052d-4148-bc2f-3efa76dc4269",
)
print(result.answer)

Multi-turn

python
VIDEO_ID = "a249266f-052d-4148-bc2f-3efa76dc4269"
history: list[dict] = []

def turn(question: str) -> str:
    result = client.chat.ask(question, video_id=VIDEO_ID, history=history)
    history.append({"role": "user", "content": question})
    history.append({"role": "assistant", "content": result.answer})
    return result.answer

print(turn("Describe what happens in this video."))
print(turn("How many people are involved?"))
print(turn("What was my first question?"))

You can also pass typed ChatMessage instances — the SDK serializes either form to the same wire payload.

top_k

top_k controls how many segments the retriever pulls before rerank/LLM. Omit it to use the server default (5). Bumping it gives the LLM more context but raises latency:

python
client.chat.ask(
    "Walk me through every event in chronological order.",
    video_id=VIDEO_ID,
    top_k=20,
)

Streaming (SSE)

ask_stream yields one AskTokenEvent per LLM chunk, then either a single AskDoneEvent (full answer attached) or AskErrorEvent (mid-stream failure; credit auto-refunded). Use isinstance to dispatch on the event type:

python
import sys
from videosdk import AskTokenEvent, AskDoneEvent, AskErrorEvent

for event in client.chat.ask_stream(
    "How many people are involved?",
    video_id=VIDEO_ID,
):
    if isinstance(event, AskTokenEvent):
        sys.stdout.write(event.content)
        sys.stdout.flush()
    elif isinstance(event, AskDoneEvent):
        print()                       # full answer is also on event.answer
    elif isinstance(event, AskErrorEvent):
        print(f"\n[error] {event.detail}")
        break

The stream terminates after the done or error event — the iterator stops on its own.


Two endpoints power semantic search and visualization over segment embeddings. Both run against videos that have completed the analyse and search-process pipelines.

Runnable examples: examples/search/search.py, visualize.py.

MethodHTTP
client.search.search(query, *, video_ids=None, collection_id=None, top_k=5)POST /api/chat/v1/developer/search
client.search.visualize(*, video_ids=None, collection_id=None)POST /api/chat/v1/developer/visualize

Notes

  • You must provide at least one of video_ids or collection_id. Passing neither raises UnprocessableEntityError from the server.
  • The caller must own every video in the request. Unowned IDs raise PermissionDeniedError (403).
  • search costs 1 credit per call (refunded on failure). visualize is free.
  • visualize needs at least 2 segment embeddings across the selection (UMAP requires ≥2 points). Fewer raises BadRequestError.

Returns the top_k ranked clip-level hits. Each SearchHit carries pre-signed thumbnail_url and video_url, plus the auth-gated hls_playlist_url.

python
hits = client.search.search(
    "person walking near the gate",
    video_ids=["b2f1...-1111", "b2f1...-2222"],
    top_k=5,
)

for hit in hits:
    print(
        f"#{hit.rank}  [{hit.start} - {hit.end}]  "
        f"video={hit.video_id}\n"
        f"    hls:   {hit.hls_playlist_url}\n"
        f"    thumb: {hit.thumbnail_url}"
    )

You can scope to a collection instead:

python
hits = client.search.search("forklift incident", collection_id=coll.collection_id)

Or both together — videos and collection are unioned server-side.

Visualization

Returns a 2-D UMAP projection (cosine metric, MinMax-scaled to [0, 1]) of every segment across the requested videos, plus per-video color/HLS/signed URLs and per-segment thumbnails. Useful for plotting an embedding map in a frontend.

python
viz = client.search.visualize(collection_id="3fa8...")

print(len(viz.videos), "videos,", len(viz.points), "points")

for video_id, info in viz.videos.items():
    print(video_id, "color=", info.color)

for p in viz.points[:5]:
    print(f"  ({p.x:.4f}, {p.y:.4f})  segment={p.segment_id}  start={p.start}")

The response shape is:

python
class VisualizationVideo:
    video_id: UUID
    color: Optional[str]            # e.g. "hsl(217, 70%, 55%)"
    hls_playlist_url: Optional[str]
    video_url: Optional[str]

class VisualizationPoint:
    segment_id: str
    video_id: UUID
    x: float                # 0.0 - 1.0
    y: float                # 0.0 - 1.0
    start: str              # "HH:MM:SS"
    end: str                # "HH:MM:SS"
    thumbnail_url: Optional[str]

class Visualization:
    videos: Dict[str, VisualizationVideo]   # keyed by video_id (str)
    points: List[VisualizationPoint]

Embeddings

Two endpoints expose the raw vectors used by search and chat: the stored per-segment vectors for a video (read directly from the vector store, no recompute), and the embedding of an arbitrary text string with the active backend.

Runnable examples: examples/embeddings/video_embeddings.py, text_embedding.py.

MethodHTTP
client.embeddings.video(video_id)GET /api/chat/v1/developer/embeddings/video/{video_id}
client.embeddings.text(text)POST /api/chat/v1/developer/embeddings/text

Notes

  • Both calls cost 1 credit (refunded on 404 / 408 / 502 / 5xx).
  • video reads from the vector store. If the video's search-process pipeline hasn't finished yet, the server returns 409 and the SDK raises ConflictError — wait until get_search_process_status(video_id).is_done() and retry.
  • text must be 1–4096 characters.

Video embeddings

python
resp = client.embeddings.video("a249266f-052d-4148-bc2f-3efa76dc4269")

print(resp.status, resp.id, len(resp.data), "segments")
for seg in resp.data:
    print(
        f"  [{seg.start} - {seg.end}]  "
        f"option={seg.embedding_option}  scope={seg.embedding_scope}  "
        f"dims={len(seg.embedding)}"
    )

Text embeddings

python
resp = client.embeddings.text("chain wearing man")
[vector] = resp.data
print(f"dims={len(vector.embedding)}  status={resp.status}")

data is a list to keep the wire shape consistent with video, but the text endpoint always returns exactly one vector. Use a destructuring assignment as above if you want a single value.


Partner integrations

The two embedding endpoints make it straightforward to plug the platform's video embeddings into your own vector database. Every integration follows the same two steps:

  1. Once per video, after search-process finishes: call client.embeddings.video(video_id) and upsert the returned per-segment vectors into your DB.
  2. At query time: call client.embeddings.text(query) and run a top-K similarity query against your DB with the returned vector.

Each section below is self-contained — both SDK calls (client.embeddings.video and client.embeddings.text) appear inline next to the DB-specific code.

Notes that apply to every DB:

  • Vector dimension: 1024 with the current backend.
  • Metric: the platform stores vectors normalized for cosine similarity. Configure your DB to match (cosine, or dot-product on normalized vectors).
  • Idempotent upserts: generate each row id deterministically from (video_id, start, end) so re-ingesting the same video is safe. The snippets below use uuid.uuid5 for this — uuid.uuid4() would re-create rows every call.

Pinecone

Install: pip install pinecone-client.

python
import uuid
from pinecone import Pinecone, ServerlessSpec

pc = Pinecone(api_key="...")
pc.create_index(
    name="video-segments",
    dimension=1024,
    metric="cosine",
    spec=ServerlessSpec(cloud="aws", region="us-east-1"),
)
index = pc.Index("video-segments")

# 1. Ingest — pull segment embeddings from our SDK, upsert into Pinecone.
resp = client.embeddings.video(video_id)
index.upsert(vectors=[
    (
        str(uuid.uuid5(uuid.NAMESPACE_URL, f"{resp.id}|{seg.start}|{seg.end}")),
        seg.embedding,
        {"video_id": str(resp.id), "start": seg.start, "end": seg.end},
    )
    for seg in resp.data
])

# 2. Query — embed the user's text with our SDK, search Pinecone.
query = client.embeddings.text("man wearing striped shirt")
[qv] = query.data
hits = index.query(vector=qv.embedding, top_k=5, include_metadata=True)
for m in hits.matches:
    print(m.score, m.metadata["video_id"], m.metadata["start"])

Qdrant

Install: pip install qdrant-client.

python
import uuid
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, PointStruct, VectorParams

qd = QdrantClient(url="http://localhost:6333")
qd.recreate_collection(
    collection_name="video-segments",
    vectors_config=VectorParams(size=1024, distance=Distance.COSINE),
)

# 1. Ingest.
resp = client.embeddings.video(video_id)
qd.upsert(
    collection_name="video-segments",
    points=[
        PointStruct(
            id=str(uuid.uuid5(uuid.NAMESPACE_URL, f"{resp.id}|{seg.start}|{seg.end}")),
            vector=seg.embedding,
            payload={"video_id": str(resp.id), "start": seg.start, "end": seg.end},
        )
        for seg in resp.data
    ],
)

# 2. Query.
query = client.embeddings.text("man wearing striped shirt")
[qv] = query.data
hits = qd.search(
    collection_name="video-segments",
    query_vector=qv.embedding,
    limit=5,
    with_payload=True,
)
for h in hits:
    print(h.score, h.payload["video_id"], h.payload["start"])

Weaviate

Install: pip install weaviate-client.

python
import weaviate
from weaviate.classes.config import Configure, Property, DataType
from weaviate.classes.query import MetadataQuery

client_wv = weaviate.connect_to_local()
client_wv.collections.create(
    name="VideoSegment",
    vectorizer_config=Configure.Vectorizer.none(),   # we supply our own vectors
    properties=[
        Property(name="video_id", data_type=DataType.TEXT),
        Property(name="start",    data_type=DataType.TEXT),
        Property(name="end",      data_type=DataType.TEXT),
    ],
)
coll = client_wv.collections.get("VideoSegment")

# 1. Ingest — Weaviate accepts our vector directly via `vector=`.
resp = client.embeddings.video(video_id)
with coll.batch.dynamic() as batch:
    for seg in resp.data:
        batch.add_object(
            properties={
                "video_id": str(resp.id),
                "start":    seg.start,
                "end":      seg.end,
            },
            vector=seg.embedding,
        )

# 2. Query.
query = client.embeddings.text("man wearing striped shirt")
[qv] = query.data
results = coll.query.near_vector(
    near_vector=qv.embedding,
    limit=5,
    return_metadata=MetadataQuery(distance=True),
)
for obj in results.objects:
    print(obj.metadata.distance, obj.properties["video_id"], obj.properties["start"])

Chroma

Install: pip install chromadb.

python
import uuid
import chromadb

chroma = chromadb.PersistentClient(path="./chroma-db")
coll = chroma.get_or_create_collection(
    name="video-segments",
    metadata={"hnsw:space": "cosine"},
)

# 1. Ingest.
resp = client.embeddings.video(video_id)
coll.upsert(
    ids=[
        str(uuid.uuid5(uuid.NAMESPACE_URL, f"{resp.id}|{seg.start}|{seg.end}"))
        for seg in resp.data
    ],
    embeddings=[seg.embedding for seg in resp.data],
    metadatas=[
        {"video_id": str(resp.id), "start": seg.start, "end": seg.end}
        for seg in resp.data
    ],
)

# 2. Query.
query = client.embeddings.text("man wearing striped shirt")
[qv] = query.data
hits = coll.query(query_embeddings=[qv.embedding], n_results=5)
for id_, dist, meta in zip(hits["ids"][0], hits["distances"][0], hits["metadatas"][0]):
    print(dist, meta["video_id"], meta["start"])

pgvector (Postgres)

Install: pip install "psycopg[binary]" pgvector.

python
import uuid
import psycopg
from pgvector.psycopg import register_vector

conn = psycopg.connect("dbname=video")
register_vector(conn)

conn.execute("""
    CREATE TABLE IF NOT EXISTS video_segments (
        segment_id  uuid       PRIMARY KEY,
        video_id    uuid       NOT NULL,
        start_ts    text       NOT NULL,
        end_ts      text       NOT NULL,
        embedding   vector(1024)
    );
    CREATE INDEX IF NOT EXISTS video_segments_vec_idx
        ON video_segments USING hnsw (embedding vector_cosine_ops);
""")

# 1. Ingest.
resp = client.embeddings.video(video_id)
with conn.cursor() as cur:
    for seg in resp.data:
        cur.execute(
            """
            INSERT INTO video_segments (segment_id, video_id, start_ts, end_ts, embedding)
            VALUES (%s, %s, %s, %s, %s)
            ON CONFLICT (segment_id) DO UPDATE SET embedding = EXCLUDED.embedding
            """,
            (
                uuid.uuid5(uuid.NAMESPACE_URL, f"{resp.id}|{seg.start}|{seg.end}"),
                resp.id,
                seg.start,
                seg.end,
                seg.embedding,
            ),
        )
conn.commit()

# 2. Query — `<=>` is cosine distance (smaller = closer).
query = client.embeddings.text("man wearing striped shirt")
[qv] = query.data
with conn.cursor() as cur:
    cur.execute(
        """
        SELECT segment_id, video_id, start_ts, end_ts, embedding <=> %s AS distance
        FROM video_segments
        ORDER BY embedding <=> %s
        LIMIT 5
        """,
        (qv.embedding, qv.embedding),
    )
    for segment_id, video_id, start_ts, end_ts, distance in cur.fetchall():
        print(distance, video_id, start_ts)

Resolving a hit back to a playable clip

Whichever DB you use, the payload carries video_id and start — enough to call back into the platform for a signed HLS URL:

python
def hit_to_playable(video_id: str, start: str):
    details = client.videos.get(video_id)
    return f"{details.hls_playlist_path}#t={start}"

details.hls_playlist_path is the auth-gated proxy URL — see Playing back HLS for the player wiring.


Errors

All errors inherit from VideoSDKError. HTTP status codes map to specific subclasses so you can catch what matters:

HTTPException
400BadRequestErrorYouTubeImportError is a subclass raised by uploads.import_youtube with the short code on .code
401AuthenticationError
403PermissionDeniedError
404NotFoundError
409ConflictError
413PayloadTooLargeError
422UnprocessableEntityError
429RateLimitError (sets .retry_after from the Retry-After header)
500InternalServerError
502BadGatewayError
503ServiceUnavailableError
otherAPIError

Every exception has .status_code and .response_body attributes.

python
from videosdk import Client, ConflictError, RateLimitError
import time

try:
    client.collections.create(name="already-taken")
except ConflictError:
    print("name is taken")
except RateLimitError as e:
    if e.retry_after:
        time.sleep(e.retry_after)

Models

Every response is parsed into a Pydantic model. Models accept extra fields (extra="allow") so additive server changes won't break your client.

UploadJob

Returned by direct, import_youtube, preprocess, and upload_file. The server's 202 response is just {"video_id": "..."} — poll get_status(video_id) to track processing.

python
class UploadJob:
    video_id: UUID

UploadStatus

Returned by client.uploads.get_status(video_id). Per-pipeline only — there is no top-level overall_status. Call derived_status() to collapse the three pipelines into a single state.

python
class TaskStatus:
    status: str          # "pending" | "in_progress" | "completed" | "failed"
    percentage: int = 0
    error: Optional[str]

class UploadStatus:
    video_id: UUID
    thumbnail_generation_status: Optional[TaskStatus]
    transcoding_status: Optional[TaskStatus]
    hls_generation_status: Optional[TaskStatus]
    error_message: Optional[str]

    def derived_status(self) -> str:
        """Returns 'pending' | 'in_progress' | 'completed' | 'failed' —
        'failed' if any pipeline failed, 'completed' if all three did,
        'in_progress' if any has started, 'pending' otherwise."""

Multipart helpers

python
class MultipartInitiate:
    upload_id: str
    s3_object_name: str
    video_id: UUID

class PartUrl:
    url: str
    part_number: int

class UploadedPart:
    part_number: int
    etag: str

class CompletedMultipart:
    s3_object_name: str
    upload_id: str
    etag: str

Video

FieldTypeNotes
video_idUUID
namestr
statusOptional[str]
video_pathOptional[str]Pre-signed.
thumbnail_pathOptional[str]Pre-signed.
hls_playlist_pathOptional[str]Auth-gated proxy URL.
transcoding_statusOptional[str]
hls_generation_statusOptional[str]
thumbnail_generation_statusOptional[str]
created_atOptional[datetime]

VideoListItem / VideoDetails

Returned by client.videos.list and client.videos.get respectively. VideoDetails adds a video_metadata: Optional[Dict[str, Any]] field for codec/duration/dimensions data.

PipelineEnqueued / PipelineStatus

Returned by client.videos.initiate_* and client.videos.get_*_status.

python
class PipelineEnqueued:
    video_id: UUID
    status: str          # "enqueued"

class PipelineStatus:
    video_id: UUID
    status: str          # see note below — not a fixed enum
    percentage: Optional[int]   # None on DB fallback (no Redis hit)
    error: Optional[str]

    def is_done(self) -> bool: ...      # status in {"done", "success", "completed"}
    def is_failed(self) -> bool: ...    # status in {"failed", "error"}
    def is_terminal(self) -> bool: ...  # is_done() or is_failed()

The two pipelines use different status vocabularies server-side: analyse returns "done" on success, search-process returns "success". Use s.is_done() / s.is_failed() / s.is_terminal() for polling loops instead of comparing the string directly.

Collection

FieldTypeNotes
collection_idUUID
namestr
descriptionOptional[str]
parent_idOptional[UUID]The owning user's id.
parent_typeOptional[str]Currently always "user".
created_atdatetime
video_countintDefaults to 0.
thumbnailsOptional[List[str]]Up to four signed thumbnail URLs.

PaginatedCollections

Returned by client.collections.list.

python
class PaginatedCollections:
    data: List[Collection]
    page: int
    page_size: int
    total: int

PaginatedCollectionVideos

Returned by client.collections.list_videos. data is a list of Video objects, each carrying pre-signed video_path, thumbnail_path, and the auth-gated hls_playlist_path.

python
class PaginatedCollectionVideos:
    data: List[Video]
    page: int
    page_size: int
    total: int

AskResponse

Returned by client.chat.ask.

python
class AskResponse:
    answer: str

The model uses extra="allow" so any future server-side fields (e.g. citations, retrieved segments) will be available via attribute access without an SDK upgrade.

SearchHit

Returned by client.search.search. start / end are HH:MM:SS timestamp strings.

python
class SearchHit:
    rank: int
    start: str              # "HH:MM:SS"
    end: str                # "HH:MM:SS"
    video_id: UUID
    thumbnail_url: Optional[str]
    hls_playlist_url: Optional[str]
    video_url: Optional[str]

Visualization

Returned by client.search.visualize. See the Search section for full field details.

python
class VisualizationVideo:
    video_id: UUID
    color: Optional[str]
    hls_playlist_url: Optional[str]
    video_url: Optional[str]

class VisualizationPoint:
    segment_id: str
    video_id: UUID
    x: float
    y: float
    start: str              # "HH:MM:SS"
    end: str                # "HH:MM:SS"
    thumbnail_url: Optional[str]

class Visualization:
    videos: Dict[str, VisualizationVideo]
    points: List[VisualizationPoint]

VideoEmbeddingsResponse

Returned by client.embeddings.video.

python
class VideoSegmentEmbedding:
    embedding: List[float]
    embedding_option: Optional[str]   # e.g. "visual"
    embedding_scope: Optional[str]    # e.g. "clip"
    start: Optional[str]              # "HH:MM:SS"
    end: Optional[str]                # "HH:MM:SS"

class VideoEmbeddingsResponse:
    id: UUID                          # echo of the requested video_id
    status: str                       # "ready"
    data: List[VideoSegmentEmbedding]

TextEmbeddingResponse

Returned by client.embeddings.text. data is a single-element list — the wire shape mirrors VideoEmbeddingsResponse for consistency.

python
class TextEmbedding:
    embedding: List[float]

class TextEmbeddingResponse:
    status: str                       # "ready"
    data: List[TextEmbedding]

Recipes

End-to-end workflows that combine multiple SDK calls. Every recipe assumes you've already constructed a Client:

python
from videosdk import Client

client = Client(api_key="sk_...", base_url="https://api.your-host.com")

Upload recipes

Upload a file and play it back

The full happy path: ingest a file, wait for transcoding + HLS to finish, hand the playlist URL to a player.

python
import time

def upload_and_wait(path: str, name: str, poll_interval: float = 2.0):
    job = client.uploads.upload_file(path, name=name)
    print(f"queued {job.video_id}")

    while True:
        status = client.uploads.get_status(job.video_id)
        derived = status.derived_status()
        if derived == "completed":
            break
        if derived == "failed":
            raise RuntimeError(f"upload failed: {status.error_message}")
        print(
            f"  transcode={getattr(status.transcoding_status, 'percentage', 0)}%  "
            f"hls={getattr(status.hls_generation_status, 'percentage', 0)}%"
        )
        time.sleep(poll_interval)

    return client.videos.get(job.video_id)

video = upload_and_wait("clip.mp4", name="My Clip")
print("HLS playlist:", video.hls_playlist_path)

video.hls_playlist_path is the auth-gated proxy URL — see Playing back HLS for the player wiring.

Resumable multipart upload

The upload_file helper aborts on any failure. If you need to survive crashes (e.g. a long upload over a flaky network), drive multipart yourself and persist the upload_id so you can resume from where list_parts says you left off.

python
import json
import httpx
from pathlib import Path

CHUNK = 10 * 1024 * 1024  # 10 MB
STATE_FILE = Path(".upload-state.json")

def resume_or_start(local_path: str, name: str):
    file_path = Path(local_path)
    if STATE_FILE.exists():
        state = json.loads(STATE_FILE.read_text())
        print(f"resuming upload {state['upload_id']}")
    else:
        init = client.uploads.initiate_multipart(file_path.name, "video/mp4")
        state = {
            "upload_id": init.upload_id,
            "s3_object_name": init.s3_object_name,
            "video_id": str(init.video_id),
        }
        STATE_FILE.write_text(json.dumps(state))

    already = client.uploads.list_parts(state["s3_object_name"], state["upload_id"])
    done_part_numbers = {p.part_number for p in already}
    parts = [p.model_dump() for p in already]

    with file_path.open("rb") as f, httpx.Client(timeout=300.0) as s3:
        part_number = 1
        while chunk := f.read(CHUNK):
            if part_number not in done_part_numbers:
                p = client.uploads.get_part_url(
                    state["s3_object_name"], state["upload_id"], part_number
                )
                r = s3.put(p.url, content=chunk)
                r.raise_for_status()
                parts.append({"part_number": part_number, "etag": r.headers["ETag"]})
            part_number += 1

    parts.sort(key=lambda p: p["part_number"])
    client.uploads.complete_multipart(
        state["s3_object_name"], state["upload_id"], parts
    )
    job = client.uploads.preprocess(
        state["s3_object_name"], state["video_id"], name=name
    )
    STATE_FILE.unlink()
    return job

resume_or_start("big-movie.mp4", "Big Movie")

Real implementations should fseek to skip already-uploaded ranges instead of re-reading every chunk.

YouTube import with error branching

Branch on YouTubeImportError.code so private/age-restricted videos are skipped instead of crashing a batch import.

python
from videosdk import YouTubeImportError

def safe_youtube_import(url: str):
    try:
        return client.uploads.import_youtube(url)
    except YouTubeImportError as e:
        if e.code == "YOUTUBE_VIDEO_PRIVATE":
            print("video is private, skipping")
            return None
        if e.code == "YOUTUBE_VIDEO_AGE_RESTRICTED":
            print("age-restricted, skipping")
            return None
        raise

See YouTube import for the full set of fatal codes (also available as videosdk.YOUTUBE_FATAL_CODES).

Video recipes

Pipeline orchestration: backfill missing analysis

Find every video that's been transcoded but hasn't run through analyse or search-process, and kick off both pipelines.

python
from concurrent.futures import ThreadPoolExecutor

def backfill_pipelines():
    pending = [
        v for v in client.videos.list(limit=500)
        if v.transcoding_status == "completed"
        and (v.analyse_status != "done" or v.search_process_status != "done")
    ]
    print(f"backfilling {len(pending)} videos")

    def kick_off(v):
        if v.analyse_status != "done":
            client.videos.initiate_analyse(v.video_id)
        if v.search_process_status != "done":
            client.videos.initiate_search_process(v.video_id)

    with ThreadPoolExecutor(max_workers=8) as pool:
        list(pool.map(kick_off, pending))

    return [v.video_id for v in pending]

backfill_pipelines()

Pair with a small polling loop that hits get_analyse_status / get_search_process_status if you want to wait for them all to finish.

Walk videos in a date range

videos.list uses skip/limit (not page). Iterate by incrementing skip until a short page tells you you've reached the end.

python
from datetime import datetime, timezone, timedelta

def recent_videos(limit_per_call: int = 100):
    week_ago = datetime.now(tz=timezone.utc) - timedelta(days=7)
    skip = 0
    while True:
        batch = client.videos.list(
            start_time=week_ago,
            sort_by="created_at",
            sort_order="desc",
            skip=skip,
            limit=limit_per_call,
        )
        if not batch:
            return
        yield from batch
        if len(batch) < limit_per_call:
            return
        skip += limit_per_call

for video in recent_videos():
    print(video.video_id, video.name, video.created_at)

Collection recipes

Bulk-upload a folder into a new collection

Create a collection, upload every file, run analyse + search-process (since add_video requires both pipelines DONE), then add each video to the collection.

python
import time
from pathlib import Path

def wait_pipelines(video_id):
    """Block until both analyse and search-process finish."""
    client.videos.initiate_analyse(video_id)
    client.videos.initiate_search_process(video_id)
    while True:
        a = client.videos.get_analyse_status(video_id)
        s = client.videos.get_search_process_status(video_id)
        if a.is_terminal() and s.is_terminal():
            if a.is_failed() or s.is_failed():
                raise RuntimeError(f"pipeline failed: analyse={a.status}, search={s.status}")
            return
        time.sleep(2)

def bulk_upload_to_collection(folder: str, collection_name: str):
    coll = client.collections.create(name=collection_name)
    print(f"created collection {coll.collection_id}")

    for path in sorted(Path(folder).glob("*.mp4")):
        video = upload_and_wait(str(path), name=path.stem)   # see "Upload a file and play it back"
        wait_pipelines(video.video_id)
        client.collections.add_video(coll.collection_id, video.video_id)
        print(f"  added {path.name}")

    return coll

bulk_upload_to_collection("./footage", "Warehouse - April")

add_video requires analyse_status == "DONE" AND search_process_status == "DONE". Skipping the pipeline step will get you a 400 BadRequestError. For real workloads, run the uploads concurrently with concurrent.futures.ThreadPoolExecutorClient is built on httpx.Client, which is thread-safe for separate request calls.

Idempotent create with retry on rate limits

Wrap the create call so duplicate-name conflicts return the existing collection instead of raising, and 429 responses respect Retry-After.

python
import time
from videosdk import RateLimitError, ConflictError

def with_retries(call, *, retries: int = 5):
    for attempt in range(retries):
        try:
            return call()
        except RateLimitError as e:
            wait = e.retry_after or (2 ** attempt)
            print(f"rate limited, sleeping {wait}s")
            time.sleep(wait)
    raise RuntimeError("retries exhausted")

def get_or_create_collection(name: str):
    try:
        return with_retries(lambda: client.collections.create(name=name))
    except ConflictError:
        page = client.collections.list(page=1, page_size=100)
        for c in page.data:
            if c.name == name:
                return c
        raise

Paginate every collection

collections.list is page-based — walk every result by incrementing page until you've yielded total.

python
def all_collections(page_size: int = 100):
    page = 1
    yielded = 0
    while True:
        result = client.collections.list(page=page, page_size=page_size)
        if not result.data:
            return
        for c in result.data:
            yield c
            yielded += 1
        if yielded >= result.total:
            return
        page += 1

Chat recipes

Multi-turn chat helper

The chat endpoint is stateless — wrap it in a small helper that owns the OpenAI-style history list so callers don't have to.

python
from videosdk import APIError

class Conversation:
    def __init__(self, client, *, video_id: str, top_k: int | None = None):
        self.client = client
        self.video_id = video_id
        self.top_k = top_k
        self.history: list[dict] = []

    def ask(self, question: str) -> str:
        result = self.client.chat.ask(
            question,
            video_id=self.video_id,
            top_k=self.top_k,
            history=self.history,
        )
        self.history.append({"role": "user", "content": question})
        self.history.append({"role": "assistant", "content": result.answer})
        return result.answer

chat = Conversation(client, video_id="a249266f-052d-4148-bc2f-3efa76dc4269")
print(chat.ask("Describe what happens."))
print(chat.ask("How many people are involved?"))
print(chat.ask("What was my first question?"))

If the pipeline crashes (5xx), the server refunds the credit automatically — but the helper doesn't append to history on failure since an APIError propagates.

Search recipes

Search-then-play

Run a query, then jump the player to the first hit's start time.

python
hits = client.search.search(
    "package being dropped",
    collection_id=coll.collection_id,
    top_k=10,
)
if not hits:
    print("no matches")
else:
    top = hits[0]
    print(f"open {top.hls_playlist_url}#t={top.start}")

The HLS URL is auth-gated — your player must inject X-API-Key, just like for client.videos.get(video_id).hls_playlist_path. See Playing back HLS.

Visualize a collection's embedding map

Group points by video so a frontend can render each video's segments in its assigned color.

python
from collections import defaultdict

def grouped_visualization(collection_id):
    viz = client.search.visualize(collection_id=collection_id)
    by_video = defaultdict(list)
    for point in viz.points:
        by_video[str(point.video_id)].append(point)

    for video_id, points in by_video.items():
        info = viz.videos[video_id]
        print(f"{video_id}  color={info.color}  segments={len(points)}")
    return viz, by_video

grouped_visualization(coll.collection_id)

visualize requires at least 2 segments across the selection — for very small collections you'll get BadRequestError. Catch it and skip rendering.