Source code for kibana._sync.client

"""Synchronous Kibana client."""

import logging
from collections.abc import Mapping
from typing import TYPE_CHECKING, Any

if TYPE_CHECKING:
    from kibana._sync.client.actions import ActionsClient
    from kibana._sync.client.alerting import AlertingClient
    from kibana._sync.client.saved_objects import SavedObjectsClient
    from kibana._sync.client.spaces import SpacesClient
    from kibana._sync.client.status import StatusClient

from elastic_transport import NodeConfig, Transport

from kibana._rate_limiter import RateLimiter
from kibana._sync.client._base import DEFAULT, BaseClient, DefaultType
from kibana.exceptions import SpaceNotFoundError
from kibana.serializer import DEFAULT_SERIALIZERS

# Set up logger
logger = logging.getLogger("kibana")


[docs] class Kibana(BaseClient): """ Synchronous client for Kibana. Provides a Pythonic interface to interact with Kibana's REST APIs. Example usage: >>> from kibana import Kibana >>> client = Kibana( ... hosts=["http://localhost:5601"], ... api_key="your_api_key" ... ) >>> # Use the client >>> client.close() Or use as a context manager: >>> with Kibana(hosts=["http://localhost:5601"]) as client: ... # Use the client ... pass """
[docs] def __init__( self, hosts: str | list[str | dict[str, Any]] | None = None, *, cloud_id: str | None = None, api_key: str | tuple[str, str] | None = None, basic_auth: tuple[str, str] | None = None, bearer_auth: str | None = None, headers: DefaultType | Mapping[str, str] = DEFAULT, request_timeout: DefaultType | None | float = DEFAULT, verify_certs: DefaultType | bool = DEFAULT, ca_certs: DefaultType | str = DEFAULT, client_cert: DefaultType | str = DEFAULT, client_key: DefaultType | str = DEFAULT, ssl_assert_hostname: DefaultType | str = DEFAULT, ssl_assert_fingerprint: DefaultType | str = DEFAULT, ssl_version: DefaultType | int = DEFAULT, ssl_context: DefaultType | Any = DEFAULT, ssl_show_warn: DefaultType | bool = DEFAULT, max_retries: DefaultType | int = DEFAULT, retry_on_status: DefaultType | list[int] = DEFAULT, retry_on_timeout: DefaultType | bool = DEFAULT, connections_per_node: DefaultType | int = DEFAULT, dead_node_backoff_factor: DefaultType | float = DEFAULT, max_dead_node_backoff: DefaultType | float = DEFAULT, node_class: DefaultType | Any = DEFAULT, node_pool_class: DefaultType | Any = DEFAULT, randomize_nodes_in_pool: DefaultType | bool = DEFAULT, max_requests_per_second: float | None = None, _transport: Transport | None = None, ) -> None: """ Initialize Kibana client. :param hosts: List of Kibana nodes to connect to. Can be a single string or a list of strings/dicts. Examples: - "http://localhost:5601" - ["http://localhost:5601", "http://localhost:5602"] - [{"host": "localhost", "port": 5601, "scheme": "http"}] :param cloud_id: Cloud ID for Elastic Cloud deployments :param api_key: API key for authentication. Can be: - Base64-encoded string - Tuple of (id, api_key) :param basic_auth: Basic authentication credentials as (username, password) :param bearer_auth: Bearer token for authentication :param headers: Custom headers to include in all requests :param request_timeout: Request timeout in seconds :param verify_certs: Whether to verify SSL certificates :param ca_certs: Path to CA certificate bundle :param client_cert: Path to client certificate :param client_key: Path to client private key :param ssl_assert_hostname: Hostname to verify in SSL certificate :param ssl_assert_fingerprint: SSL certificate fingerprint to verify :param ssl_version: SSL/TLS version to use :param ssl_context: Custom SSL context :param ssl_show_warn: Whether to show SSL warnings :param max_retries: Maximum number of retries for failed requests :param retry_on_status: HTTP status codes to retry on :param retry_on_timeout: Whether to retry on timeout :param connections_per_node: Number of connections per node :param dead_node_backoff_factor: Backoff factor for dead nodes :param max_dead_node_backoff: Maximum backoff time for dead nodes :param node_class: Custom node class :param node_pool_class: Custom node pool class :param randomize_nodes_in_pool: Whether to randomize node order :param max_requests_per_second: Optional rate limit (requests/sec). When set, outgoing requests are throttled using a token-bucket algorithm to prevent overwhelming the Kibana cluster. :param _transport: Pre-configured Transport instance (for testing) """ # If transport is provided (for testing), use it directly if _transport is not None: super().__init__(_transport=_transport) # Store auth credentials for options() method self._api_key = api_key self._basic_auth = basic_auth self._bearer_auth = bearer_auth self._request_timeout = ( request_timeout if not isinstance(request_timeout, DefaultType) else None ) self._custom_headers = ( headers if not isinstance(headers, DefaultType) else None ) if max_requests_per_second is not None: self._rate_limiter = RateLimiter(max_requests_per_second) return # Validate that either hosts or cloud_id is provided if hosts is None and cloud_id is None: raise ValueError("Either 'hosts' or 'cloud_id' must be provided") # Build node configurations node_configs = self._build_node_configs(hosts, cloud_id) # Build transport options transport_kwargs: dict[str, Any] = { "node_configs": node_configs, "serializers": DEFAULT_SERIALIZERS, } # Note: SSL/TLS options like verify_certs, ca_certs, client_cert, client_key # are configured on NodeConfig, not Transport. They are accepted here for # API compatibility but stored for future use when creating SSL contexts. # For now, we just accept them without error. # Add retry options (these are valid Transport parameters) if not isinstance(max_retries, DefaultType): transport_kwargs["max_retries"] = max_retries if not isinstance(retry_on_status, DefaultType): transport_kwargs["retry_on_status"] = retry_on_status if not isinstance(retry_on_timeout, DefaultType): transport_kwargs["retry_on_timeout"] = retry_on_timeout # Add node pool options (these are valid Transport parameters) if not isinstance(node_class, DefaultType): transport_kwargs["node_class"] = node_class if not isinstance(node_pool_class, DefaultType): transport_kwargs["node_pool_class"] = node_pool_class if not isinstance(randomize_nodes_in_pool, DefaultType): transport_kwargs["randomize_nodes_in_pool"] = randomize_nodes_in_pool # Create Transport instance transport = Transport(**transport_kwargs) # Initialize base client with transport super().__init__(_transport=transport) # Store authentication credentials self._api_key = api_key self._basic_auth = basic_auth self._bearer_auth = bearer_auth # Store request timeout if not isinstance(request_timeout, DefaultType): self._request_timeout = request_timeout # Store custom headers if not isinstance(headers, DefaultType): self._custom_headers = headers logger.info("Kibana client initialized with %d node(s)", len(node_configs)) # Set up rate limiting if configured if max_requests_per_second is not None: self._rate_limiter = RateLimiter(max_requests_per_second) logger.info( "Rate limiting enabled: %.1f requests/sec", max_requests_per_second )
def _build_node_configs( self, hosts: str | list[str | dict[str, Any]] | None, cloud_id: str | None, ) -> list[NodeConfig]: """ Build NodeConfig objects from hosts or cloud_id. :param hosts: Host specifications :param cloud_id: Cloud ID for Elastic Cloud :return: List of NodeConfig objects """ if cloud_id is not None: # Parse cloud_id and create NodeConfig # Cloud ID format: cluster_name:base64_encoded_data # The base64 data contains: cloud_host$es_uuid$kibana_uuid import base64 try: _, encoded = cloud_id.split(":", 1) decoded = base64.b64decode(encoded).decode("utf-8") parts = decoded.split("$") if len(parts) >= 3: cloud_host = parts[0] kibana_uuid = parts[2] # Construct Kibana URL return [ NodeConfig( scheme="https", host=f"{kibana_uuid}.{cloud_host}", port=443 ) ] else: raise ValueError(f"Invalid cloud_id format: {cloud_id}") except Exception as e: raise ValueError(f"Failed to parse cloud_id: {e}") # Parse hosts if isinstance(hosts, str): hosts = [hosts] if hosts is None: raise ValueError("hosts cannot be None when cloud_id is not provided") node_configs = [] for host in hosts: if isinstance(host, str): # Parse URL string manually from urllib.parse import urlparse parsed = urlparse(host) scheme = parsed.scheme or "http" hostname = parsed.hostname or "localhost" port = parsed.port or (443 if scheme == "https" else 5601) node_config = NodeConfig( scheme=scheme, host=hostname, port=port, path_prefix=( parsed.path if parsed.path and parsed.path != "/" else "" ), ) node_configs.append(node_config) elif isinstance(host, dict): # Create NodeConfig from dict node_config = NodeConfig(**host) node_configs.append(node_config) else: raise ValueError(f"Invalid host specification: {host}") return node_configs
[docs] def close(self) -> None: """ Close the client and release resources. This closes all connections in the connection pool. After calling close(), the client should not be used. """ try: self._transport.close() logger.debug("Kibana client closed") except Exception as e: logger.warning("Error closing Kibana client: %s", e)
[docs] def __enter__(self) -> "Kibana": """Enter context manager.""" return self
[docs] def __exit__(self, *args: Any) -> None: """Exit context manager and close client.""" self.close()
[docs] def __repr__(self) -> str: """Return string representation of client.""" return "<Kibana()>"
[docs] def space(self, space_id: str, validate: bool = True) -> "SpaceScopedKibana": """ Create a space-scoped client instance. This method creates a new client instance that automatically operates within the specified space context. All operations performed through the returned client will be scoped to the specified space. :param space_id: The ID of the space to scope operations to :param validate: Whether to validate that the space exists (default: True) :return: SpaceScopedKibana instance scoped to the specified space :raises SpaceNotFoundError: If validate=True and the space doesn't exist :raises InvalidSpaceIdError: If the space_id format is invalid Example: >>> # Create a space-scoped client with validation >>> marketing_client = client.space("marketing") >>> >>> # Create connector in the marketing space >>> connector = marketing_client.actions.create( ... name="Marketing Webhook", ... connector_type_id=".webhook", ... config={"url": "https://marketing.example.com/webhook"} ... ) >>> >>> # Create space-scoped client without validation (for performance) >>> fast_client = client.space("marketing", validate=False) """ return SpaceScopedKibana(self, space_id, validate)
@property def actions(self) -> "ActionsClient": """ Access the Actions API for managing Kibana action connectors. Actions in Kibana are connectors that enable integration with external systems for alerting, notifications, and automation. :return: ActionsClient instance for managing action connectors Example: >>> # Create a webhook connector >>> connector = client.actions.create( ... name="Alert Webhook", ... connector_type_id=".webhook", ... config={"url": "https://example.com/webhook"} ... ) >>> # List all connectors >>> connectors = client.actions.get_all() >>> # Execute a connector >>> result = client.actions.execute( ... id=connector["id"], ... params={"message": "Test alert"} ... ) """ # Lazy initialization of ActionsClient if not hasattr(self, "_actions_client"): from kibana._sync.client.actions import ActionsClient self._actions_client = ActionsClient(self) return self._actions_client @property def spaces(self) -> "SpacesClient": """ Access the Spaces API for managing Kibana Spaces. Spaces allow you to organize your Kibana objects (dashboards, visualizations, etc.) into separate, isolated areas. Each space can have its own set of saved objects and can be used to implement multi-tenancy. :return: SpacesClient instance for managing spaces Example: >>> # Create a new space >>> space = client.spaces.create( ... id="marketing", ... name="Marketing Team", ... description="Space for marketing team" ... ) >>> # List all spaces >>> spaces = client.spaces.get_all() >>> # Update a space >>> updated = client.spaces.update( ... id="marketing", ... name="Marketing Department" ... ) >>> # Delete a space >>> client.spaces.delete(id="marketing") """ # Lazy initialization of SpacesClient if not hasattr(self, "_spaces_client"): from kibana._sync.client.spaces import SpacesClient self._spaces_client = SpacesClient(self) return self._spaces_client @property def saved_objects(self) -> "SavedObjectsClient": """ Access the Saved Objects API for managing Kibana saved objects. Saved Objects in Kibana are entities like dashboards, visualizations, index patterns, and other configuration items. This API provides methods to create, read, update, and delete saved objects. :return: SavedObjectsClient instance for managing saved objects Example: >>> # Create a dashboard >>> dashboard = client.saved_objects.create( ... type="dashboard", ... attributes={"title": "My Dashboard"} ... ) >>> # Get a saved object >>> obj = client.saved_objects.get( ... type="dashboard", ... id="my-dashboard-id" ... ) >>> # Update a saved object >>> updated = client.saved_objects.update( ... type="dashboard", ... id="my-dashboard-id", ... attributes={"title": "Updated Dashboard"} ... ) >>> # Delete a saved object >>> client.saved_objects.delete( ... type="dashboard", ... id="my-dashboard-id" ... ) """ # Lazy initialization of SavedObjectsClient if not hasattr(self, "_saved_objects_client"): from kibana._sync.client.saved_objects import SavedObjectsClient self._saved_objects_client = SavedObjectsClient(self) return self._saved_objects_client @property def status(self) -> "StatusClient": """ Access the Status API for monitoring Kibana server health and statistics. The Status API provides information about the Kibana server's operational state, including overall health status, individual service statuses, and detailed operational metrics. :return: StatusClient instance for monitoring server status Example: >>> # Get current Kibana status >>> status = client.status.get_status() >>> print(status.body["status"]["overall"]["level"]) # available, degraded, or unavailable >>> # Get operational statistics >>> stats = client.status.get_stats() >>> print(stats.body["process"]["uptime_in_millis"]) >>> print(stats.body["os"]["load"]) """ # Lazy initialization of StatusClient if not hasattr(self, "_status_client"): from kibana._sync.client.status import StatusClient self._status_client = StatusClient(self) return self._status_client @property def alerting(self) -> "AlertingClient": """ Access the Alerting API for managing rules. :return: AlertingClient instance for managing rules. """ # Lazy initialization of AlertingClient if not hasattr(self, "_alerting_client"): from kibana._sync.client.alerting import AlertingClient self._alerting_client = AlertingClient(self) return self._alerting_client
[docs] class SpaceScopedKibana: """ Space-scoped client that delegates to main client with space context. This class provides the same API surface as the main Kibana client but automatically scopes all operations to a specific space. All child clients (actions, saved_objects, etc.) created through this instance will inherit the space context and validation settings. Example: >>> # Create space-scoped client with validation >>> marketing_client = client.space("marketing") >>> >>> # All operations are automatically scoped to "marketing" space >>> connector = marketing_client.actions.create( ... name="Marketing Webhook", ... connector_type_id=".webhook", ... config={"url": "https://marketing.example.com/webhook"} ... ) >>> >>> # Create space-scoped client without validation for performance >>> fast_client = client.space("marketing", validate=False) """
[docs] def __init__(self, client: Kibana, space_id: str, validate: bool = True) -> None: """ Initialize space-scoped client. :param client: The main Kibana client to delegate to :param space_id: The space ID to scope operations to :param validate: Whether to validate that the space exists :raises SpaceNotFoundError: If validate=True and the space doesn't exist """ self._client = client self._space_id = space_id self._validate = validate # Validate space exists immediately if validation is enabled if validate: self._validate_space_on_creation()
def _validate_space_on_creation(self) -> None: """ Validate space exists when creating space-scoped client. :raises SpaceNotFoundError: If the space doesn't exist """ try: self._client.spaces.get(id=self._space_id) except Exception as e: # Check if this is a "not found" error error_str = str(e).lower() if "not found" in error_str or "404" in error_str: raise SpaceNotFoundError(self._space_id) else: # Re-raise other errors (auth, network, etc.) raise @property def actions(self) -> "ActionsClient": """ Get ActionsClient with space scope. Returns an ActionsClient instance that automatically operates within the space context of this SpaceScopedKibana instance. :return: ActionsClient scoped to this space Example: >>> marketing_client = client.space("marketing") >>> # This connector will be created in the "marketing" space >>> connector = marketing_client.actions.create( ... name="Marketing Webhook", ... connector_type_id=".webhook", ... config={"url": "https://marketing.example.com/webhook"} ... ) """ if not hasattr(self, "_actions_client"): from kibana._sync.client.actions import ActionsClient self._actions_client = ActionsClient( self._client, default_space_id=self._space_id, validate_spaces=self._validate, ) return self._actions_client @property def saved_objects(self) -> "SavedObjectsClient": """ Get SavedObjectsClient with space scope. Returns a SavedObjectsClient instance that automatically operates within the space context of this SpaceScopedKibana instance. :return: SavedObjectsClient scoped to this space Example: >>> marketing_client = client.space("marketing") >>> # This dashboard will be created in the "marketing" space >>> dashboard = marketing_client.saved_objects.create( ... type="dashboard", ... attributes={"title": "Marketing Dashboard"} ... ) """ if not hasattr(self, "_saved_objects_client"): from kibana._sync.client.saved_objects import SavedObjectsClient self._saved_objects_client = SavedObjectsClient( self._client, default_space_id=self._space_id, validate_spaces=self._validate, ) return self._saved_objects_client @property def spaces(self) -> "SpacesClient": """ Get SpacesClient (not space-scoped). The SpacesClient is used for managing spaces themselves and is not scoped to a particular space. It uses the same client as the parent Kibana instance. :return: SpacesClient for managing spaces """ return self._client.spaces @property def status(self) -> "StatusClient": """ Get StatusClient (not space-scoped). The StatusClient provides server-wide status information and is not scoped to a particular space. It uses the same client as the parent Kibana instance. :return: StatusClient for monitoring server status """ return self._client.status
[docs] def close(self) -> None: """ Close the underlying client and release resources. This delegates to the main Kibana client's close() method. """ self._client.close()
[docs] def __enter__(self) -> "SpaceScopedKibana": """Enter context manager.""" return self
[docs] def __exit__(self, *args: Any) -> None: """Exit context manager and close client.""" self.close()
[docs] def __repr__(self) -> str: """Return string representation of space-scoped client.""" return f"<SpaceScopedKibana(space_id='{self._space_id}', validate={self._validate})>"