Source code for kibana._async.client

"""Asynchronous Kibana client."""

import logging
from collections.abc import Mapping
from typing import TYPE_CHECKING, Any

from elastic_transport import AsyncTransport, NodeConfig

from kibana._async.client._base import DEFAULT, AsyncBaseClient, DefaultType
from kibana._rate_limiter import AsyncRateLimiter
from kibana.exceptions import SpaceNotFoundError
from kibana.serializer import DEFAULT_SERIALIZERS

__all__ = ["AsyncKibana", "AsyncSpaceScopedKibana", "DEFAULT", "DefaultType"]

if TYPE_CHECKING:
    from kibana._async.client.actions import AsyncActionsClient
    from kibana._async.client.alerting import AsyncAlertingClient
    from kibana._async.client.saved_objects import AsyncSavedObjectsClient
    from kibana._async.client.spaces import AsyncSpacesClient
    from kibana._async.client.status import AsyncStatusClient

# Set up logger
logger = logging.getLogger("kibana")


[docs] class AsyncKibana(AsyncBaseClient): """ Asynchronous client for Kibana. Provides a Pythonic async interface to interact with Kibana's REST APIs. Example usage: >>> from kibana import AsyncKibana >>> client = AsyncKibana( ... hosts=["http://localhost:5601"], ... api_key="your_api_key" ... ) >>> # Use the client >>> await client.close() Or use as an async context manager: >>> async with AsyncKibana(hosts=["http://localhost:5601"]) as client: ... # Use the client ... pass """
[docs] def __init__( self, hosts: str | list[str | dict[str, Any]] | None = None, *, cloud_id: str | None = None, api_key: str | tuple[str, str] | None = None, basic_auth: tuple[str, str] | None = None, bearer_auth: str | None = None, headers: DefaultType | Mapping[str, str] = DEFAULT, request_timeout: DefaultType | None | float = DEFAULT, verify_certs: DefaultType | bool = DEFAULT, ca_certs: DefaultType | str = DEFAULT, client_cert: DefaultType | str = DEFAULT, client_key: DefaultType | str = DEFAULT, ssl_assert_hostname: DefaultType | str = DEFAULT, ssl_assert_fingerprint: DefaultType | str = DEFAULT, ssl_version: DefaultType | int = DEFAULT, ssl_context: DefaultType | Any = DEFAULT, ssl_show_warn: DefaultType | bool = DEFAULT, max_retries: DefaultType | int = DEFAULT, retry_on_status: DefaultType | list[int] = DEFAULT, retry_on_timeout: DefaultType | bool = DEFAULT, connections_per_node: DefaultType | int = DEFAULT, dead_node_backoff_factor: DefaultType | float = DEFAULT, max_dead_node_backoff: DefaultType | float = DEFAULT, node_class: DefaultType | Any = DEFAULT, node_pool_class: DefaultType | Any = DEFAULT, randomize_nodes_in_pool: DefaultType | bool = DEFAULT, max_requests_per_second: float | None = None, _transport: AsyncTransport | None = None, ) -> None: """ Initialize AsyncKibana client. :param hosts: List of Kibana nodes to connect to. Can be a single string or a list of strings/dicts. Examples: - "http://localhost:5601" - ["http://localhost:5601", "http://localhost:5602"] - [{"host": "localhost", "port": 5601, "scheme": "http"}] :param cloud_id: Cloud ID for Elastic Cloud deployments :param api_key: API key for authentication. Can be: - Base64-encoded string - Tuple of (id, api_key) :param basic_auth: Basic authentication credentials as (username, password) :param bearer_auth: Bearer token for authentication :param headers: Custom headers to include in all requests :param request_timeout: Request timeout in seconds :param verify_certs: Whether to verify SSL certificates :param ca_certs: Path to CA certificate bundle :param client_cert: Path to client certificate :param client_key: Path to client private key :param ssl_assert_hostname: Hostname to verify in SSL certificate :param ssl_assert_fingerprint: SSL certificate fingerprint to verify :param ssl_version: SSL/TLS version to use :param ssl_context: Custom SSL context :param ssl_show_warn: Whether to show SSL warnings :param max_retries: Maximum number of retries for failed requests :param retry_on_status: HTTP status codes to retry on :param retry_on_timeout: Whether to retry on timeout :param connections_per_node: Number of connections per node :param dead_node_backoff_factor: Backoff factor for dead nodes :param max_dead_node_backoff: Maximum backoff time for dead nodes :param node_class: Custom node class :param node_pool_class: Custom node pool class :param randomize_nodes_in_pool: Whether to randomize node order :param max_requests_per_second: Optional rate limit (requests/sec). When set, outgoing requests are throttled using a token-bucket algorithm to prevent overwhelming the Kibana cluster. :param _transport: Pre-configured AsyncTransport instance (for testing) """ # If transport is provided (for testing), use it directly if _transport is not None: super().__init__(_transport=_transport) # Store auth credentials for options() method self._api_key = api_key self._basic_auth = basic_auth self._bearer_auth = bearer_auth self._request_timeout = ( request_timeout if not isinstance(request_timeout, DefaultType) else None ) self._custom_headers = ( headers if not isinstance(headers, DefaultType) else None ) if max_requests_per_second is not None: self._rate_limiter = AsyncRateLimiter(max_requests_per_second) return # Validate that either hosts or cloud_id is provided if hosts is None and cloud_id is None: raise ValueError("Either 'hosts' or 'cloud_id' must be provided") # Build node configurations node_configs = self._build_node_configs(hosts, cloud_id) # Build transport options transport_kwargs: dict[str, Any] = { "node_configs": node_configs, "serializers": DEFAULT_SERIALIZERS, } # Note: SSL/TLS options like verify_certs, ca_certs, client_cert, client_key # are configured on NodeConfig, not Transport. They are accepted here for # API compatibility but stored for future use when creating SSL contexts. # For now, we just accept them without error. # Add retry options (these are valid Transport parameters) if not isinstance(max_retries, DefaultType): transport_kwargs["max_retries"] = max_retries if not isinstance(retry_on_status, DefaultType): transport_kwargs["retry_on_status"] = retry_on_status if not isinstance(retry_on_timeout, DefaultType): transport_kwargs["retry_on_timeout"] = retry_on_timeout # Add node pool options (these are valid Transport parameters) if not isinstance(node_class, DefaultType): transport_kwargs["node_class"] = node_class if not isinstance(node_pool_class, DefaultType): transport_kwargs["node_pool_class"] = node_pool_class if not isinstance(randomize_nodes_in_pool, DefaultType): transport_kwargs["randomize_nodes_in_pool"] = randomize_nodes_in_pool # Create AsyncTransport instance transport = AsyncTransport(**transport_kwargs) # Initialize base client with transport super().__init__(_transport=transport) # Store authentication credentials self._api_key = api_key self._basic_auth = basic_auth self._bearer_auth = bearer_auth # Store request timeout if not isinstance(request_timeout, DefaultType): self._request_timeout = request_timeout # Store custom headers if not isinstance(headers, DefaultType): self._custom_headers = headers logger.info("AsyncKibana client initialized with %d node(s)", len(node_configs)) # Set up rate limiting if configured if max_requests_per_second is not None: self._rate_limiter = AsyncRateLimiter(max_requests_per_second) logger.info( "Rate limiting enabled: %.1f requests/sec", max_requests_per_second )
def _build_node_configs( self, hosts: str | list[str | dict[str, Any]] | None, cloud_id: str | None, ) -> list[NodeConfig]: """ Build NodeConfig objects from hosts or cloud_id. :param hosts: Host specifications :param cloud_id: Cloud ID for Elastic Cloud :return: List of NodeConfig objects """ if cloud_id is not None: # Parse cloud_id and create NodeConfig # Cloud ID format: cluster_name:base64_encoded_data # The base64 data contains: cloud_host$es_uuid$kibana_uuid import base64 try: _, encoded = cloud_id.split(":", 1) decoded = base64.b64decode(encoded).decode("utf-8") parts = decoded.split("$") if len(parts) >= 3: cloud_host = parts[0] kibana_uuid = parts[2] # Construct Kibana URL return [ NodeConfig( scheme="https", host=f"{kibana_uuid}.{cloud_host}", port=443 ) ] else: raise ValueError(f"Invalid cloud_id format: {cloud_id}") except Exception as e: raise ValueError(f"Failed to parse cloud_id: {e}") # Parse hosts if isinstance(hosts, str): hosts = [hosts] if hosts is None: raise ValueError("hosts cannot be None when cloud_id is not provided") node_configs = [] for host in hosts: if isinstance(host, str): # Parse URL string manually from urllib.parse import urlparse parsed = urlparse(host) scheme = parsed.scheme or "http" hostname = parsed.hostname or "localhost" port = parsed.port or (443 if scheme == "https" else 5601) node_config = NodeConfig( scheme=scheme, host=hostname, port=port, path_prefix=( parsed.path if parsed.path and parsed.path != "/" else "" ), ) node_configs.append(node_config) elif isinstance(host, dict): # Create NodeConfig from dict node_config = NodeConfig(**host) node_configs.append(node_config) else: raise ValueError(f"Invalid host specification: {host}") return node_configs
[docs] async def close(self) -> None: """ Close the client and release resources. This closes all connections in the connection pool. After calling close(), the client should not be used. """ try: await self._transport.close() logger.debug("AsyncKibana client closed") except Exception as e: logger.warning("Error closing AsyncKibana client: %s", e)
[docs] async def __aenter__(self) -> "AsyncKibana": """Enter async context manager.""" return self
[docs] async def __aexit__(self, *args: Any) -> None: """Exit async context manager and close client.""" await self.close()
@property def actions(self) -> "AsyncActionsClient": """ Access the Actions API for managing Kibana action connectors. Actions in Kibana are connectors that enable integration with external systems for alerting, notifications, and automation. :return: AsyncActionsClient instance for managing action connectors Example: >>> # Create a webhook connector >>> connector = await client.actions.create( ... name="Alert Webhook", ... connector_type_id=".webhook", ... config={"url": "https://example.com/webhook"} ... ) >>> # List all connectors >>> connectors = await client.actions.get_all() >>> # Execute a connector >>> result = await client.actions.execute( ... id=connector.body["id"], ... params={"message": "Test alert"} ... ) """ # Lazy initialization of AsyncActionsClient if not hasattr(self, "_actions_client"): from kibana._async.client.actions import AsyncActionsClient self._actions_client = AsyncActionsClient(self) return self._actions_client @property def spaces(self) -> "AsyncSpacesClient": """ Get the Spaces client for managing Kibana Spaces. :return: AsyncSpacesClient instance """ if not hasattr(self, "_spaces_client"): from kibana._async.client.spaces import AsyncSpacesClient self._spaces_client = AsyncSpacesClient(self) return self._spaces_client @property def status(self) -> "AsyncStatusClient": """ Get the Status client for checking Kibana status. :return: AsyncStatusClient instance """ if not hasattr(self, "_status_client"): from kibana._async.client.status import AsyncStatusClient self._status_client = AsyncStatusClient(self) return self._status_client @property def saved_objects(self) -> "AsyncSavedObjectsClient": """ Access the Saved Objects API for managing Kibana saved objects. Saved Objects in Kibana are entities like dashboards, visualizations, index patterns, and other configuration items. This API provides methods to create, read, update, and delete saved objects. :return: AsyncSavedObjectsClient instance for managing saved objects Example: >>> # Create a dashboard >>> dashboard = await client.saved_objects.create( ... type="dashboard", ... attributes={"title": "My Dashboard"} ... ) >>> # Get a saved object >>> obj = await client.saved_objects.get( ... type="dashboard", ... id="my-dashboard-id" ... ) >>> # Update a saved object >>> updated = await client.saved_objects.update( ... type="dashboard", ... id="my-dashboard-id", ... attributes={"title": "Updated Dashboard"} ... ) >>> # Delete a saved object >>> await client.saved_objects.delete( ... type="dashboard", ... id="my-dashboard-id" ... ) """ # Lazy initialization of AsyncSavedObjectsClient if not hasattr(self, "_saved_objects_client"): from kibana._async.client.saved_objects import AsyncSavedObjectsClient self._saved_objects_client = AsyncSavedObjectsClient(self) return self._saved_objects_client @property def alerting(self) -> "AsyncAlertingClient": """ Access the Alerting API for managing rules. :return: AsyncAlertingClient instance for managing rules. """ # Lazy initialization of AsyncAlertingClient if not hasattr(self, "_alerting_client"): from kibana._async.client.alerting import AsyncAlertingClient self._alerting_client = AsyncAlertingClient(self) return self._alerting_client
[docs] def space(self, space_id: str, validate: bool = True) -> "AsyncSpaceScopedKibana": """ Create a space-scoped client instance. This method creates a new client instance that automatically operates within the specified space context. All operations performed through the returned client will be scoped to the specified space. :param space_id: The ID of the space to scope operations to :param validate: Whether to validate that the space exists (default: True) :return: AsyncSpaceScopedKibana instance scoped to the specified space :raises SpaceNotFoundError: If validate=True and the space doesn't exist :raises InvalidSpaceIdError: If the space_id format is invalid Example: >>> # Create a space-scoped client with validation >>> marketing_client = await client.space("marketing") >>> >>> # Create connector in the marketing space >>> connector = await marketing_client.actions.create( ... name="Marketing Webhook", ... connector_type_id=".webhook", ... config={"url": "https://marketing.example.com/webhook"} ... ) >>> >>> # Create space-scoped client without validation (for performance) >>> fast_client = await client.space("marketing", validate=False) """ return AsyncSpaceScopedKibana(self, space_id, validate)
[docs] def __repr__(self) -> str: """Return string representation of client.""" return "<AsyncKibana()>"
[docs] class AsyncSpaceScopedKibana: """ Space-scoped async client that delegates to main client with space context. This class provides the same API surface as the main AsyncKibana client but automatically scopes all operations to a specific space. All child clients (actions, saved_objects, etc.) created through this instance will inherit the space context and validation settings. Example: >>> # Create space-scoped client with validation >>> marketing_client = client.space("marketing") >>> >>> # All operations are automatically scoped to "marketing" space >>> connector = await marketing_client.actions.create( ... name="Marketing Webhook", ... connector_type_id=".webhook", ... config={"url": "https://marketing.example.com/webhook"} ... ) >>> >>> # Create space-scoped client without validation for performance >>> fast_client = client.space("marketing", validate=False) """
[docs] def __init__( self, client: AsyncKibana, space_id: str, validate: bool = True ) -> None: """ Initialize space-scoped async client. :param client: The main AsyncKibana client to delegate to :param space_id: The space ID to scope operations to :param validate: Whether to validate that the space exists :raises SpaceNotFoundError: If validate=True and the space doesn't exist """ self._client = client self._space_id = space_id self._validate = validate
# Note: For async, we can't validate synchronously in __init__ # Validation will happen on first use if enabled async def _validate_space_on_creation(self) -> None: """ Validate space exists when creating space-scoped client. :raises SpaceNotFoundError: If the space doesn't exist """ try: await self._client.spaces.get(id=self._space_id) except Exception as e: # Check if this is a "not found" error error_str = str(e).lower() if "not found" in error_str or "404" in error_str: raise SpaceNotFoundError(self._space_id) else: # Re-raise other errors (auth, network, etc.) raise @property def actions(self) -> "AsyncActionsClient": """ Get AsyncActionsClient with space scope. Returns an AsyncActionsClient instance that automatically operates within the space context of this AsyncSpaceScopedKibana instance. :return: AsyncActionsClient scoped to this space Example: >>> marketing_client = client.space("marketing") >>> # This connector will be created in the "marketing" space >>> connector = await marketing_client.actions.create( ... name="Marketing Webhook", ... connector_type_id=".webhook", ... config={"url": "https://marketing.example.com/webhook"} ... ) """ if not hasattr(self, "_actions_client"): from kibana._async.client.actions import AsyncActionsClient self._actions_client = AsyncActionsClient( self._client, default_space_id=self._space_id, validate_spaces=self._validate, ) return self._actions_client @property def saved_objects(self) -> "AsyncSavedObjectsClient": """ Get AsyncSavedObjectsClient with space scope. Returns an AsyncSavedObjectsClient instance that automatically operates within the space context of this AsyncSpaceScopedKibana instance. :return: AsyncSavedObjectsClient scoped to this space Example: >>> marketing_client = client.space("marketing") >>> # This dashboard will be created in the "marketing" space >>> dashboard = await marketing_client.saved_objects.create( ... type="dashboard", ... attributes={"title": "Marketing Dashboard"} ... ) """ if not hasattr(self, "_saved_objects_client"): from kibana._async.client.saved_objects import AsyncSavedObjectsClient self._saved_objects_client = AsyncSavedObjectsClient( self._client, default_space_id=self._space_id, validate_spaces=self._validate, ) return self._saved_objects_client @property def spaces(self) -> "AsyncSpacesClient": """ Get AsyncSpacesClient (not space-scoped). The AsyncSpacesClient is used for managing spaces themselves and is not scoped to a particular space. It uses the same client as the parent AsyncKibana instance. :return: AsyncSpacesClient for managing spaces """ return self._client.spaces @property def status(self) -> "AsyncStatusClient": """ Get AsyncStatusClient (not space-scoped). The AsyncStatusClient provides server-wide status information and is not scoped to a particular space. It uses the same client as the parent AsyncKibana instance. :return: AsyncStatusClient for monitoring server status """ return self._client.status
[docs] async def close(self) -> None: """ Close the underlying client and release resources. This delegates to the main AsyncKibana client's close() method. """ await self._client.close()
[docs] async def __aenter__(self) -> "AsyncSpaceScopedKibana": """Enter async context manager.""" return self
[docs] async def __aexit__(self, *args: Any) -> None: """Exit async context manager and close client.""" await self.close()
[docs] def __repr__(self) -> str: """Return string representation of space-scoped client.""" return f"<AsyncSpaceScopedKibana(space_id='{self._space_id}', validate={self._validate})>"