A video indexing task represents a request to upload and index a video. The resources.Task
class provides methods to manage your video indexing tasks.
Methods
Create a video indexing task
Description: This method creates a new video indexing task that uploads and indexes a video.
Function signature and example:
def create(
self,
index_id: str,
*,
file: Union[str, BinaryIO, None] = None,
url: Optional[str] = None,
transcription_file: Union[str, BinaryIO, None] = None,
transcription_url: Optional[str] = None,
language: Optional[str] = None,
disable_video_stream: Optional[bool] = None,
**kwargs,
) -> models.Task
from twelvelabs.models.task import Task
task = client.task.create(
index_id=<"<YOUR_INDEX_ID>",
file="<YOUR_FILE_PATH>",
transcription_file="<YOUR_TRANSCRIPTION_FILE>"
)
print(f"Task id={task.id}")
# Utility function to print the status of a video indexing task
def on_task_update(task: Task):
print(f" Status={task.status}")
task.wait_for_done(sleep_interval=5, callback=on_task_update)
if task.status != "ready":
raise RuntimeError(f"Indexing failed with status {task.status}")
print(f"Video ID: {task.video_id}")
Parameters:
Name | Type | Required | Description |
---|---|---|---|
index_d | str | Yes | The unique identifier of the index to which the video will be uploaded. |
file | Union[str, BinaryIO, None] | No | Path to the video file or a file-like object. |
url | Optional[str] | No | The publicly accessible URL of the video you want to upload. |
transcription_file | Union[str, BinaryIO, None] | No | Path to the transcription file or a file-like object. |
transcription_url | Optional[str] | No | The URL of the transcription file. |
language | Optional[str] | No | The language of the video. |
disable_video_stream | Optional[str] | No | Indicates if the platform stores the video for streaming. |
**kwargs | dict | No | Additional keyword arguments for the request. |
Return value: Returns a models.Task
instance representing the newly created video indexing task.
Note:
As shown in the example code above, you can use the
waitForDone
method of themodels.Task
class to monitor the status of a video indexing task until it completes.
API Reference: For a description of each field in the request and response, see the Create a video indexing task page.
Related guide: Upload single videos.
Retrieve a video indexing task
Description: This method retrieves the details of a specific task.
Function signature and example:
def retrieve(self, id: str, **kwargs) -> models.Task
retrieved_task = client.task.retrieve("<YOUR_TASK_ID>")
print(f"Task ID: {retrieved_task.id}")
print(f"Index ID: {retrieved_task.index_id}")
print(f"Video ID: {retrieved_task.video_id}")
print(f"Estimated time: {retrieved_task.estimated_time}")
print(f"Status: {retrieved_task.status}")
print("Metadata:")
for key, value in retrieved_task.metadata.items():
print(f" {key}: {value}")
if retrieved_task.hls:
print("HLS:")
print(f" Video URL: {retrieved_task.hls.video_url}")
print(" Thumbnail URLs:")
for url in retrieved_task.hls.thumbnail_urls or []:
print(f" {url}")
print(f" Status: {retrieved_task.hls.status}")
print(f" Updated at: {retrieved_task.hls.updated_at}")
if retrieved_task.process:
print("Process:")
print(f" Percentage: {retrieved_task.process.percentage}%")
print(f" Remaining Seconds: {retrieved_task.process.remain_seconds}")
print(f"Created at: {retrieved_task.created_at}")
print(f"Updated at: {retrieved_task.updated_at}")
Parameters
Name | Type | Required | Description |
---|---|---|---|
id | str | Yes | The unique identifier of the task to retrieve. |
**kwargs | dict | No | Additional keyword arguments for the request. |
Return value: Returns a models.Task
object representing the retrieved video indexing task.
API Reference: For a description of each field in the response, see the Retrieve a video indexing task.
List video indexing tasks with direct pagination
Description: This method returns a list of the video indexing tasks in your account. By default, the platform returns your video indexing tasks sorted by creation date, with the newest at the top of the list. Choose this method mainly when the total number of items is manageable, or you must fetch a single page of results.
Function signature and example:
def list(
self,
*,
id: Optional[str] = None,
index_id: Optional[str] = None,
filename: Optional[str] = None,
duration: Optional[float] = None,
width: Optional[int] = None,
height: Optional[int] = None,
created_at: Optional[Union[str, Dict[str, str]]] = None,
updated_at: Optional[Union[str, Dict[str, str]]] = None,
estimated_time: Optional[str] = None,
page: Optional[int] = None,
page_limit: Optional[int] = None,
sort_by: Optional[str] = None,
sort_option: Optional[str] = None,
**kwargs,
) -> RootModelList[models.Task]:
tasks = client.task.list(
id="<YOUR_TASK_ID>",
index_id="<YOUR_INDEX_ID>",
filename="<YOUR_FILENAME>",
duration=20,
width=1920,
height=1080,
sort_by = "updated_at",
sort_option="asc",
created_at="2024-09-17T07:53:46.365Z",
updated_at="2024-09-17T07:53:46.365Z",
estimated_time="2024-09-17T07:55:22.125Z",
page=2,
page_limit=5,
)
for task in tasks:
print(f"Task ID: {task.id}")
print(f"Index ID: {task.index_id}")
print(f"Video ID: {task.video_id}")
print(f"Estimated time: {task.estimated_time}")
print(f"Status: {task.status}")
print("Metadata:")
for key, value in task.metadata.items():
print(f" {key}: {value}")
if task.hls:
print("HLS:")
print(f" Video URL: {task.hls.video_url}")
print(" Thumbnail URLs:")
for url in task.hls.thumbnail_urls or []:
print(f" {url}")
print(f" Status: {task.hls.status}")
print(f" Updated at: {task.hls.updated_at}")
if task.process:
print("Process:")
print(f" Percentage: {task.process.percentage}%")
print(f" Remaining Seconds: {task.process.remain_seconds}")
print(f"Created at: {task.created_at}")
print(f"Updated at: {task.updated_at}")
Parameters:
Name | Type | Required | Description |
---|---|---|---|
id | Optional[str] | No | Filter by the unique identifier of a video indexing task. |
index_id | Optional[str] | No | Filter by the unique identifier of an index. |
filename | Optional[str] | No | Filter by filename. |
duration | Optional[float] | No | Filter by duration expressed in seconds. |
width | Optional[int] | No | Filter by width. |
height | Optional[int] | No | Filter by height. |
created_at | Optional[Union[str, Dict[str, str]]] | No | Filter by the creation date of the task. This parameter can be a string or a dictionary with string keys and values for range queries. |
updated_at | Optional[Union[str, Dict[str, str]]] | No | Filter by the last update date of the task. This parameter can be a string or a dictionary with string keys and values for range queries. |
estimated_time | Optional[str] | No | Filter by the estimated processing time, expressed in seconds. |
page | Optional[int] | No | Page number for pagination. Defaults to 1. |
page_limit | Optional[int] | No | Number of items per page. Defaults to 10. |
sort_by | Optional[str] | No | Field to sort by ("created_at" or "updated_at"). Defaults to "created_at". |
sort_option | Optional[str] | No | Sort order ("asc" or "desc"). Defaults to "desc". |
**kwargs | dict | No | Additional keyword arguments for the request. |
Return value: Returns a RootModelList
containing models.Task
objects representing the list of videos that match the specified criteria.
API Reference: For a description of each field in the request and response, see the List video indexing tasks page.
Related guides:
List video indexing tasks with iterative pagination
Description: This method returns a paginated list of the video indexing tasks in your account. Choose this method mainly when your application must retrieve a large number of items. By default, the platform returns your video indexing tasks sorted by creation date, with the newest at the top of the list.
Function signature and example:
def list_pagination(
self,
*,
id: Optional[str] = None,
index_id: Optional[str] = None,
filename: Optional[str] = None,
duration: Optional[float] = None,
width: Optional[int] = None,
height: Optional[int] = None,
created_at: Optional[Union[str, Dict[str, str]]] = None,
updated_at: Optional[Union[str, Dict[str, str]]] = None,
estimated_time: Optional[str] = None,
page: Optional[int] = None,
page_limit: Optional[int] = None,
sort_by: Optional[str] = None,
sort_option: Optional[str] = None,
**kwargs,
) -> models.TaskListWithPagination
def print_page(page):
for task in page:
print(f"Task ID: {task.id}")
print(f"Index ID: {task.index_id}")
print(f"Video ID: {task.video_id}")
print(f"Estimated time: {task.estimated_time}")
print(f"Status: {task.status}")
print("Metadata:")
for key, value in task.metadata.items():
print(f" {key}: {value}")
if task.hls:
print("HLS:")
print(f" Video URL: {task.hls.video_url}")
print(" Thumbnail URLs:")
for url in task.hls.thumbnail_urls or []:
print(f" {url}")
print(f" Status: {task.hls.status}")
print(f" Updated at: {task.hls.updated_at}")
if task.process:
print("Process:")
print(f" Percentage: {task.process.percentage}%")
print(f" Remaining Seconds: {task.process.remain_seconds}")
print(f"Created at: {task.created_at}")
print(f"Updated at: {task.updated_at}")
# Fetch the initial page of results
task_paginator = client.task.list_pagination(
id="<YOUR_TASK_ID>",
index_id="<YOUR_INDEX_ID>",
filename="<YOUR_FILENAME>",
duration=20,
width=1920,
height=1080,
sort_by = "updated_at",
sort_option="asc",
created_at="2024-09-17T07:53:46.365Z",
updated_at="2024-09-17T07:53:46.365Z",
estimated_time="2024-09-17T07:55:22.125Z",
page=2,
page_limit=5,
)
# Print the first page of results
print_page(task_paginator.data)
# Iterate through subsequent pages
while True:
try:
next_task_page = next(task_paginator)
print_page(next_task_page)
except StopIteration:
break
Parameters:
Name | Type | Required | Description |
---|---|---|---|
id | Optional[str] | No | Filter by the unique identifier of a video indexing task. |
index_id | Optional[str] | No | Filter by the unique identifier of an index. |
filename | Optional[str] | No | Filter by filename. |
duration | Optional[float] | No | Filter by duration expressed in seconds. |
width | Optional[int] | No | Filter by width. |
height | Optional[int] | No | Filter by height. |
created_at | Optional[Union[str, Dict[str, str]]] | No | Filter by the creation date of the task. This parameter can be a string or a dictionary with string keys and values for range queries. |
updated_at | Optional[Union[str, Dict[str, str]]] | No | Filter by the last update date of the task. This parameter can be a string or a dictionary with string keys and values for range queries. |
estimated_time | Optional[str] | No | Filter by the estimated processing time, expressed in seconds. |
page | Optional[int] | No | Page number for pagination. Defaults to 1. |
page_limit | Optional[int] | No | Number of items per page. Defaults to 10. |
sort_by | Optional[str] | No | Field to sort by ("created_at" or "updated_at"). Defaults to "created_at". |
sort_option | Optional[str] | No | Sort order ("asc" or "desc"). Defaults to "desc". |
**kwargs | dict | No | Additional keyword arguments for the request. |
Return value: Returns a models.TaskListWithPagination
object containing the list of videos that match the specified criteria and pagination information.
Note:
To retrieve subsequent pages of results, use the iterator protocol:
- Invoke the
next
function, passing theTaskListWithPagination
object as a parameter.- Repeat this call until a
StopIteration
exception occurs, indicating no more pages exist.
API Reference: For a description of each field in the request and response, see the List video indexing tasks page.
Related guides:
Delete a video indexing task
Description: This method deletes an existing video indexing task.
Function signature and example:
def delete(self, id: str, **kwargs) -> None
client.task.delete("<YOUR_TASK_ID>");
Parameters:
Name | Type | Required | Description |
---|---|---|---|
id | str | Yes | The unique identifier of the video indexing task to delete. |
**kwargs | dict | No | Additional keyword arguments for the request. |
Return value: None
. This method doesn't return any data upon successful completion.
API Reference: Delete a video indexing task.
Make a cloud-to-cloud-transfer
NOTE: This method will be deprecated. Twelve Labs recommends you use the Import videos method instead.
Description: This method makes a cloud-to-cloud transfer. Cloud-to-cloud transfers allow you to upload multiple videos at once by grouping multiple video indexing operations in a single API call. Initially, this feature is supported for the "us-west-2" region of AWS S3.
Function signature and example:
def transfer(self, file: BinaryIO, **kwargs) -> None
client.task.transfer(file="<YOUR_JSON_FILE>")
Parameters
Name | Type | Required | Description |
---|---|---|---|
file | BinaryIO | Yes | A JSON file that contains a list of the files you wish to upload. |
**kwargs | dict | No | Additional keyword arguments for the request. |
Return value: None
. This method doesn't return any data upon successful completion.
API Reference: For a description of each field in the request, see the Make a cloud-to-cloud transfer page.
Related guide: Cloud-to-cloud transfers
Import videos
Description: An import represents the process of uploading and indexing all videos from the specified integration. This method initiates an asynchronous import.
Function signature and example:
def import_videos(
self,
integration_id: str,
index_id: str,
user_metadata: Optional[Dict[str, Any]] = None,
incremental_import: Optional[bool] = None,
retry_failed: Optional[bool] = None,
**kwargs,
) -> models.TransferImportResponse
res = client.task.transfers.import_videos(
"<YOUR_INTEGRATION_ID>",
"<YOUR_INDEX_ID>",
)
for video in res.videos:
print(f"video: {video.video_id} {video.filename}")
if res.failed_files:
for failed_file in res.failed_files:
print(f"failed_file: {failed_file.filename} {failed_file.error_message}")
Parameters:
Name | Type | Required | Description |
---|---|---|---|
integration_id | str | Yes | The unique identifier of the integration for which you want to import videos. |
index_id | str | Yes | The unique identifier of the index to which the videos are being uploaded. |
user_metadata | Optional[Dict[str, Any]] = None | No | Metadata that helps you categorize your videos. |
incremental_import | Optional[bool] = None | No | Specifies whether or not incremental sync is enabled. If set to false , the platform will synchronize all the files in the bucket. |
**kwargs | dict | No | Additional keyword arguments for the request. |
Return value: Returns a models.TransferImportResource
object containing two lists:
- Videos that will be imported.
- Videos that will not be imported, typically due to unmet prerequisites .
API Refference: Import videos.
Related guide: Cloud-to-cloud integrations.
Retrieve import status
Description: This method retrieves the current status for each video from a specified integration and index.
Function signature and example:
def import_status(
self, integration_id: str, index_id: str, **kwargs
) -> models.TransferImportStatusResponse
status = client.task.transfers.import_status(integration_id, index_id)
for ready in status.ready:
print(f"ready: {ready.video_id} {ready.filename} {ready.created_at}")
for failed in status.failed:
print(f"failed: {failed.filename} {failed.error_message}")
Parameters:
Name | Type | Required | Description |
---|---|---|---|
integration_id | str | Yes | The unique identifier of the integration for which you want to retrieve the status of your imported videos. |
index_id | str | Yes | The unique identifier of the index for which you want to retrieve the status of your imported videos. |
**kwargs | dict | No | Additional keyword arguments for the request. |
Return value: Returns a models.TransferImportStatusResponse
object containing lists of videos grouped by status. See the Task object page for details on each status.
API Reference: Retrieve import status.
Related guide: Cloud-to-cloud integrations.
Retrieve import logs
Description: This method returns a chronological list of import operations for the specified integration.
Function signature and example:
def import_logs(
self, integration_id: str, **kwargs
) -> RootModelList[models.TransferImportLog]
logs = client.task.transfers.import_logs("<YOUR_INTEGRATION_ID>")
for log in logs:
print(
f"index_id={log.index_id} index_name={log.index_name} created_at={log.created_at} ended_at={log.ended_at} video_status={log.video_status}"
)
if log.failed_files:
for failed_file in log.failed_files:
print(
f"failed_file: {failed_file.filename} {failed_file.error_message}"
)
Parameters:
Name | Type | Required | Description |
---|---|---|---|
integration_id | str | Yes | The unique identifier of the integration for which you want to retrieve the import logs. |
kwargs | dict | No | Additional keyword arguments for the request. |
Return value: Returns a models.TransferImportLog
object containing a chronological list of import operations for the specified integration. The list is sorted by creation date, with the oldest imports first. Each item in the list contains:
- The number of videos in each status
- Detailed error information for failed uploads, including filenames and error messages.
API Reference: Retrieve import logs.
Related guide: Cloud-to-cloud integrations.