import queue
import threading
from collections import OrderedDict
from typing import Any, Dict, Generator, List, Optional, Set, Union
from cognite.client import utils
from cognite.client._api.assets import AssetsAPI
from cognite.client.data_classes import TimestampRange
from cognite.client.exceptions import CogniteAPIError
from cognite.experimental.data_classes import Asset, AssetFilter, AssetList, AssetUpdate
from cognite.experimental.utils import use_v1_instead_of_playground
class ExperimentalAssetsAPI(AssetsAPI):
_RESOURCE_PATH = "/assets"
_LIST_CLASS = AssetList
def __call__(
self,
chunk_size: int = None,
name: str = None,
parent_ids: List[int] = None,
root_ids: List[int] = None,
root_external_ids: List[str] = None,
asset_subtree_ids: List[int] = None,
asset_subtree_external_ids: List[str] = None,
metadata: Dict[str, str] = None,
data_set_ids: List[int] = None,
data_set_external_ids: List[str] = None,
source: str = None,
created_time: Union[Dict[str, Any], TimestampRange] = None,
last_updated_time: Union[Dict[str, Any], TimestampRange] = None,
root: bool = None,
external_id_prefix: str = None,
types: List = None,
labels: List = None,
aggregated_properties: List[str] = None,
limit: int = None,
) -> Generator[Union[Asset, AssetList], None, None]:
"""Iterate over assets
Fetches assets as they are iterated over, so you keep a limited number of assets in memory.
Args:
chunk_size (int, optional): Number of assets to return in each chunk. Defaults to yielding one asset a time.
name (str): Name of asset. Often referred to as tag.
parent_ids (List[int]): Return only the direct descendants of the specified assets.
root_ids (List[int], optional): List of root ids ids to filter on.
root_external_ids (List[str], optional): List of root external ids to filter on.
asset_subtree_ids (List[int]): List of asset subtrees ids to filter on.
asset_subtree_external_ids (List[str]): List of asset subtrees external ids to filter on.
metadata (Dict[str, str]): Custom, application specific metadata. String key -> String value
data_set_ids (List[int]): Return only assets in the specified data sets with these ids.
data_set_external_ids (List[str]): Return only assets in the specified data sets with these external ids.
source (str): The source of this asset
created_time (Union[Dict[str, int], TimestampRange]): Range between two timestamps. Possible keys are `min` and `max`, with values given as time stamps in ms.
last_updated_time (Union[Dict[str, int], TimestampRange]): Range between two timestamps. Possible keys are `min` and `max`, with values given as time stamps in ms.
root (bool): filtered assets are root assets or not
external_id_prefix (str): Filter by this (case-sensitive) prefix for the external ID.
types (List): List of type filters.
labels (List): List of label filters
aggregated_properties (List[str]): Set of aggregated properties to include.
limit (int, optional): Maximum number of assets to return. Defaults to return all items.
Yields:
Union[Asset, AssetList]: yields Asset one by one if chunk is not specified, else AssetList objects.
"""
if aggregated_properties:
aggregated_properties = [utils._auxiliary.to_camel_case(s) for s in aggregated_properties]
# dict option for backward compatibility
if (root_ids and not isinstance(root_ids[0], dict)) or root_external_ids:
root_ids = self._process_ids(root_ids, root_external_ids, wrap_ids=True)
if asset_subtree_ids or asset_subtree_external_ids:
asset_subtree_ids = self._process_ids(asset_subtree_ids, asset_subtree_external_ids, wrap_ids=True)
if data_set_ids or data_set_external_ids:
data_set_ids = self._process_ids(data_set_ids, data_set_external_ids, wrap_ids=True)
filter = AssetFilter(
name=name,
parent_ids=parent_ids,
root_ids=root_ids,
asset_subtree_ids=asset_subtree_ids,
data_set_ids=data_set_ids,
metadata=metadata,
source=source,
created_time=created_time,
last_updated_time=last_updated_time,
root=root,
external_id_prefix=external_id_prefix,
types=types,
labels=labels,
).dump(camel_case=True)
return self._list_generator(
method="POST",
chunk_size=chunk_size,
filter=filter,
limit=limit,
other_params={"aggregatedProperties": aggregated_properties} if aggregated_properties else {},
)
def create(self, asset: Union[Asset, List[Asset]]) -> Union[Asset, AssetList]:
"""`Create one or more assets. <https://docs.cognite.com/api/playground/#operation/createAssets>`_
You can create an arbitrary number of assets, and the SDK will split the request into multiple requests.
Args:
asset (Union[Asset, List[Asset]]): Asset or list of assets to create.
Returns:
Union[Asset, AssetList]: Created asset(s)
Examples:
Create new assets::
>>> from cognite.experimental import CogniteClient
>>> from cognite.experimental.data_classes import Asset
>>> c = CogniteClient()
>>> assets = [Asset(name="asset1"), Asset(name="asset2")]
>>> res = c.assets_playground.create(assets)
"""
utils._auxiliary.assert_type(asset, "asset", [Asset, list])
return self._create_multiple(asset)
def retrieve(self, id: Optional[int] = None, external_id: Optional[str] = None) -> Optional[Asset]:
"""`Retrieve a single asset by id. <https://docs.cognite.com/api/playground/#operation/getAsset>`_
Args:
id (int, optional): ID
external_id (str, optional): External ID
Returns:
Optional[Asset]: Requested asset or None if it does not exist.
Examples:
Get asset by id::
>>> from cognite.experimental import CogniteClient
>>> c = CogniteClient()
>>> res = c.assets_playground.retrieve(id=1)
Get asset by external id::
>>> from cognite.experimental import CogniteClient
>>> c = CogniteClient()
>>> res = c.assets_playground.retrieve(external_id="1")
"""
utils._auxiliary.assert_exactly_one_of_id_or_external_id(id, external_id)
return self._retrieve_multiple(ids=id, external_ids=external_id, wrap_ids=True)
def retrieve_multiple(
self,
ids: Optional[List[int]] = None,
external_ids: Optional[List[str]] = None,
ignore_unknown_ids: bool = False,
) -> AssetList:
"""`Retrieve multiple assets by id. <https://docs.cognite.com/api/playground/#operation/byIdsAssets>`_
Args:
ids (List[int], optional): IDs
external_ids (List[str], optional): External IDs
ignore_unknown_ids (bool): Ignore IDs and external IDs that are not found rather than throw an exception.
Returns:
AssetList: The requested assets.
Examples:
Get assets by id::
>>> from cognite.experimental import CogniteClient
>>> c = CogniteClient()
>>> res = c.assets_playground.retrieve_multiple(ids=[1, 2, 3])
Get assets by external id::
>>> from cognite.experimental import CogniteClient
>>> c = CogniteClient()