StatusClient¶
Client for monitoring Kibana server health and statistics through the Status API.
The Status API provides information about the Kibana server’s operational state, including overall health status, individual service statuses, and detailed operational metrics.
- class kibana._sync.client.status.StatusClient(client)[source]¶
Bases:
NamespaceClientClient for Kibana Status API operations.
The Status API provides health and operational information about the Kibana server and its dependencies. This is useful for monitoring, health checks, and troubleshooting.
- Status levels:
available: All services are operational
degraded: Some services are experiencing issues but Kibana is functional
unavailable: Critical services are down, Kibana may not be functional
Example
>>> from kibana import Kibana >>> client = Kibana("http://localhost:5601", api_key="...") >>> >>> # Check overall status >>> status = client.status.get_status() >>> print(status.body["status"]["overall"]["level"]) available >>> >>> # Check individual service statuses >>> for service, info in status.body["status"]["statuses"].items(): ... print(f"{service}: {info['level']}") elasticsearch: available savedObjects: available >>> >>> # Get operational statistics >>> stats = client.status.get_stats() >>> print(stats.body["process"]["memory"]["heap"]["used_in_bytes"])
Overview
The StatusClient provides methods to check Kibana server health and retrieve operational statistics. This is useful for monitoring, alerting, and health checks.
Checking Server Status
Get the current Kibana server status with the
get_status()method:from kibana import Kibana client = Kibana("http://localhost:5601") # Get server status status = client.status.get_status() # Check overall status level overall_status = status.body["status"]["overall"]["level"] print(f"Kibana status: {overall_status}") # Status levels: "available", "degraded", or "unavailable" if overall_status == "available": print("✓ Kibana is healthy") elif overall_status == "degraded": print("⚠ Kibana is degraded") else: print("✗ Kibana is unavailable")
Status Response Structure
The status response includes detailed information about each service:
status = client.status.get_status() # Overall status overall = status.body["status"]["overall"] print(f"Overall: {overall['level']} - {overall['summary']}") # Individual service statuses for service_name, service_status in status.body["status"]["statuses"].items(): level = service_status["level"] summary = service_status.get("summary", "") print(f"{service_name}: {level} - {summary}")
Example status levels:
available- Service is fully operationaldegraded- Service is operational but with issuesunavailable- Service is not operational
Retrieving Operational Statistics
Get detailed operational metrics with the
get_stats()method:# Get operational statistics stats = client.status.get_stats() # Process information process = stats.body["process"] print(f"Uptime: {process['uptime_in_millis']} ms") print(f"Memory usage: {process['memory']['heap']['used_in_bytes']} bytes") # OS information os_info = stats.body["os"] print(f"Platform: {os_info['platform']}") print(f"Load average: {os_info['load']}") # Response times response_times = stats.body["response_times"] print(f"Average response time: {response_times['avg_in_millis']} ms") print(f"Max response time: {response_times['max_in_millis']} ms")
Statistics Response Structure
The statistics response includes:
Process metrics: Uptime, memory usage, event loop delay
OS metrics: Platform, CPU count, load average, memory
Response times: Average, max response times
Requests: Total requests, disconnects, status codes
Concurrent connections: Current connection count
stats = client.status.get_stats() # Memory usage heap = stats.body["process"]["memory"]["heap"] print(f"Heap used: {heap['used_in_bytes'] / 1024 / 1024:.2f} MB") print(f"Heap total: {heap['total_in_bytes'] / 1024 / 1024:.2f} MB") print(f"Heap limit: {heap['size_limit'] / 1024 / 1024:.2f} MB") # Request statistics requests = stats.body["requests"] print(f"Total requests: {requests['total']}") print(f"Disconnects: {requests['disconnects']}") # Status code breakdown for code, count in requests["status_codes"].items(): print(f"HTTP {code}: {count} requests")
Health Check Integration
Use the Status API for health checks and monitoring:
def check_kibana_health(client): """Check if Kibana is healthy.""" try: status = client.status.get_status() level = status.body["status"]["overall"]["level"] if level == "available": return True, "Kibana is healthy" elif level == "degraded": return False, "Kibana is degraded" else: return False, "Kibana is unavailable" except Exception as e: return False, f"Failed to check status: {e}" # Use in monitoring is_healthy, message = check_kibana_health(client) if not is_healthy: # Send alert print(f"ALERT: {message}")
Monitoring Best Practices
Best practices for using the Status API:
Regular health checks: Poll the status endpoint periodically
Alert on degradation: Set up alerts for degraded or unavailable status
Track metrics over time: Store statistics for trend analysis
Monitor response times: Watch for increasing response times
Check memory usage: Alert on high memory usage
import time def monitor_kibana(client, interval=60): """Monitor Kibana health continuously.""" while True: try: # Check status status = client.status.get_status() level = status.body["status"]["overall"]["level"] # Get stats stats = client.status.get_stats() uptime = stats.body["process"]["uptime_in_millis"] heap_used = stats.body["process"]["memory"]["heap"]["used_in_bytes"] print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}]") print(f" Status: {level}") print(f" Uptime: {uptime / 1000 / 60:.2f} minutes") print(f" Heap: {heap_used / 1024 / 1024:.2f} MB") if level != "available": print(f" WARNING: Kibana is {level}") except Exception as e: print(f" ERROR: Failed to get status: {e}") time.sleep(interval)
Error Handling
Handle errors when checking status:
from kibana.exceptions import ( ConnectionError, AuthenticationException, TransportError ) try: status = client.status.get_status() except AuthenticationException as e: print(f"Authentication failed: {e.message}") except ConnectionError as e: print(f"Cannot connect to Kibana: {e}") except TransportError as e: print(f"Transport error: {e}")
- __init__(client)[source]¶
Initialize the StatusClient.
- Parameters:
client (Kibana) – The parent Kibana client instance to delegate requests to.
Example
>>> status_client = StatusClient(kibana_client)
- get_status()[source]¶
Get the current status of the Kibana server.
Returns comprehensive health information about the Kibana server and its dependencies. This endpoint is commonly used for health checks in monitoring systems and load balancers.
- The response includes:
Overall status level (available, degraded, unavailable)
Individual service statuses (Elasticsearch, SavedObjects, etc.)
Version information (Kibana version, build number)
Server identification (name, UUID)
Detailed status messages for each service
- Returns:
status.overall.level: Overall status level
status.overall.summary: Human-readable status summary
status.statuses: Dictionary of individual service statuses
version: Kibana version information
name: Server name
uuid: Server UUID
- Return type:
ObjectApiResponse containing status information with the following structure
- Raises:
AuthenticationException – If authentication fails.
AuthorizationException – If insufficient privileges to view status.
TransportError – If unable to connect to Kibana.
Example
>>> # Basic status check >>> status = client.status.get_status() >>> if status.body["status"]["overall"]["level"] == "available": ... print("Kibana is healthy") ... else: ... print("Kibana has issues") Kibana is healthy >>> >>> # Check specific service status >>> es_status = status.body["status"]["statuses"]["elasticsearch"] >>> print(f"Elasticsearch: {es_status['level']}") >>> if es_status['level'] != 'available': ... print(f"Issue: {es_status.get('summary', 'Unknown')}") Elasticsearch: available >>> >>> # Get version information >>> version = status.body["version"] >>> print(f"Kibana {version['number']} (build {version['build_number']})") Kibana 8.11.0 (build 12345)
- get_stats()[source]¶
Get operational statistics about the Kibana server.
Returns detailed performance and resource utilization metrics for the Kibana server. This is useful for monitoring, capacity planning, and performance troubleshooting.
- The response includes:
Process metrics (memory usage, uptime, event loop delay)
OS metrics (platform, CPU load, memory, uptime)
HTTP metrics (response times, request counts, connections)
Request statistics (total, disconnects, status codes)
- Returns:
process: Process-level metrics (memory, uptime, event loop)
os: Operating system metrics (platform, load, memory)
response_times: HTTP response time statistics
requests: Request count statistics
concurrent_connections: Current connection count
- Return type:
ObjectApiResponse containing statistics with the following structure
- Raises:
AuthenticationException – If authentication fails.
AuthorizationException – If insufficient privileges to view stats.
TransportError – If unable to connect to Kibana.
Example
>>> # Get server statistics >>> stats = client.status.get_stats() >>> >>> # Check memory usage >>> heap = stats.body["process"]["memory"]["heap"] >>> used_mb = heap["used_in_bytes"] / (1024 * 1024) >>> total_mb = heap["total_in_bytes"] / (1024 * 1024) >>> print(f"Heap: {used_mb:.1f}MB / {total_mb:.1f}MB") Heap: 245.3MB / 512.0MB >>> >>> # Check uptime >>> uptime_sec = stats.body["process"]["uptime_in_millis"] / 1000 >>> uptime_hours = uptime_sec / 3600 >>> print(f"Uptime: {uptime_hours:.1f} hours") Uptime: 24.5 hours >>> >>> # Check response times >>> response_times = stats.body.get("response_times", {}) >>> if "avg_in_millis" in response_times: ... print(f"Avg response time: {response_times['avg_in_millis']}ms") Avg response time: 45ms >>> >>> # Check concurrent connections >>> connections = stats.body.get("concurrent_connections", 0) >>> print(f"Active connections: {connections}") Active connections: 12
AsyncStatusClient¶
Asynchronous version of the StatusClient for use with async/await syntax.
- class kibana._async.client.status.AsyncStatusClient(client)[source]¶
Bases:
AsyncNamespaceClientAsync client for Kibana Status API operations.
Usage
The AsyncStatusClient provides the same methods as StatusClient but all methods are async and must be awaited:
from kibana import AsyncKibana import asyncio async def main(): async with AsyncKibana("http://localhost:5601") as client: # Get status (async) status = await client.status.get_status() print(f"Status: {status.body['status']['overall']['level']}") # Get stats (async) stats = await client.status.get_stats() print(f"Uptime: {stats.body['process']['uptime_in_millis']} ms") asyncio.run(main())
Concurrent Monitoring
Monitor multiple Kibana instances concurrently:
import asyncio from kibana import AsyncKibana async def check_instance(url): """Check status of a single Kibana instance.""" async with AsyncKibana(url) as client: status = await client.status.get_status() return { "url": url, "level": status.body["status"]["overall"]["level"] } async def monitor_cluster(): """Monitor multiple Kibana instances.""" instances = [ "http://kibana1:5601", "http://kibana2:5601", "http://kibana3:5601" ] # Check all instances concurrently results = await asyncio.gather( *[check_instance(url) for url in instances], return_exceptions=True ) for result in results: if isinstance(result, Exception): print(f"Error: {result}") else: print(f"{result['url']}: {result['level']}") asyncio.run(monitor_cluster())
Async Health Monitoring
Implement continuous async health monitoring:
import asyncio from kibana import AsyncKibana async def monitor_health(client, interval=60): """Monitor Kibana health continuously (async).""" while True: try: # Get status and stats concurrently status, stats = await asyncio.gather( client.status.get_status(), client.status.get_stats() ) level = status.body["status"]["overall"]["level"] uptime = stats.body["process"]["uptime_in_millis"] print(f"Status: {level}, Uptime: {uptime / 1000:.2f}s") if level != "available": print(f"WARNING: Kibana is {level}") except Exception as e: print(f"ERROR: {e}") await asyncio.sleep(interval) async def main(): async with AsyncKibana("http://localhost:5601") as client: await monitor_health(client, interval=30) asyncio.run(main())
- __init__(client)[source]¶
Initialize the AsyncStatusClient.
- Parameters:
client (AsyncKibana) – The parent AsyncKibana client instance
- async get_status()[source]¶
Get the current status of the Kibana server.
Returns information about the Kibana server status including: - Overall status level (available, degraded, unavailable) - Individual service statuses (Elasticsearch, SavedObjects, etc.) - Version information - Server name and UUID
- Returns:
ObjectApiResponse containing status information
- Return type:
- async get_stats()[source]¶
Get operational statistics about the Kibana server.
Returns detailed metrics including: - Process information (memory usage, uptime) - OS information (platform, load, memory) - Response times and request counts - Concurrent connections
- Returns:
ObjectApiResponse containing statistics
- Return type: