StatusClient

Client for monitoring Kibana server health and statistics through the Status API.

The Status API provides information about the Kibana server’s operational state, including overall health status, individual service statuses, and detailed operational metrics.

class kibana._sync.client.status.StatusClient(client)[source]

Bases: NamespaceClient

Client for Kibana Status API operations.

The Status API provides health and operational information about the Kibana server and its dependencies. This is useful for monitoring, health checks, and troubleshooting.

Status levels:
  • available: All services are operational

  • degraded: Some services are experiencing issues but Kibana is functional

  • unavailable: Critical services are down, Kibana may not be functional

Example

>>> from kibana import Kibana
>>> client = Kibana("http://localhost:5601", api_key="...")
>>>
>>> # Check overall status
>>> status = client.status.get_status()
>>> print(status.body["status"]["overall"]["level"])
available
>>>
>>> # Check individual service statuses
>>> for service, info in status.body["status"]["statuses"].items():
...     print(f"{service}: {info['level']}")
elasticsearch: available
savedObjects: available
>>>
>>> # Get operational statistics
>>> stats = client.status.get_stats()
>>> print(stats.body["process"]["memory"]["heap"]["used_in_bytes"])

Overview

The StatusClient provides methods to check Kibana server health and retrieve operational statistics. This is useful for monitoring, alerting, and health checks.

Checking Server Status

Get the current Kibana server status with the get_status() method:

from kibana import Kibana

client = Kibana("http://localhost:5601")

# Get server status
status = client.status.get_status()

# Check overall status level
overall_status = status.body["status"]["overall"]["level"]
print(f"Kibana status: {overall_status}")

# Status levels: "available", "degraded", or "unavailable"
if overall_status == "available":
    print("✓ Kibana is healthy")
elif overall_status == "degraded":
    print("⚠ Kibana is degraded")
else:
    print("✗ Kibana is unavailable")

Status Response Structure

The status response includes detailed information about each service:

status = client.status.get_status()

# Overall status
overall = status.body["status"]["overall"]
print(f"Overall: {overall['level']} - {overall['summary']}")

# Individual service statuses
for service_name, service_status in status.body["status"]["statuses"].items():
    level = service_status["level"]
    summary = service_status.get("summary", "")
    print(f"{service_name}: {level} - {summary}")

Example status levels:

  • available - Service is fully operational

  • degraded - Service is operational but with issues

  • unavailable - Service is not operational

Retrieving Operational Statistics

Get detailed operational metrics with the get_stats() method:

# Get operational statistics
stats = client.status.get_stats()

# Process information
process = stats.body["process"]
print(f"Uptime: {process['uptime_in_millis']} ms")
print(f"Memory usage: {process['memory']['heap']['used_in_bytes']} bytes")

# OS information
os_info = stats.body["os"]
print(f"Platform: {os_info['platform']}")
print(f"Load average: {os_info['load']}")

# Response times
response_times = stats.body["response_times"]
print(f"Average response time: {response_times['avg_in_millis']} ms")
print(f"Max response time: {response_times['max_in_millis']} ms")

Statistics Response Structure

The statistics response includes:

  • Process metrics: Uptime, memory usage, event loop delay

  • OS metrics: Platform, CPU count, load average, memory

  • Response times: Average, max response times

  • Requests: Total requests, disconnects, status codes

  • Concurrent connections: Current connection count

stats = client.status.get_stats()

# Memory usage
heap = stats.body["process"]["memory"]["heap"]
print(f"Heap used: {heap['used_in_bytes'] / 1024 / 1024:.2f} MB")
print(f"Heap total: {heap['total_in_bytes'] / 1024 / 1024:.2f} MB")
print(f"Heap limit: {heap['size_limit'] / 1024 / 1024:.2f} MB")

# Request statistics
requests = stats.body["requests"]
print(f"Total requests: {requests['total']}")
print(f"Disconnects: {requests['disconnects']}")

# Status code breakdown
for code, count in requests["status_codes"].items():
    print(f"HTTP {code}: {count} requests")

Health Check Integration

Use the Status API for health checks and monitoring:

def check_kibana_health(client):
    """Check if Kibana is healthy."""
    try:
        status = client.status.get_status()
        level = status.body["status"]["overall"]["level"]

        if level == "available":
            return True, "Kibana is healthy"
        elif level == "degraded":
            return False, "Kibana is degraded"
        else:
            return False, "Kibana is unavailable"
    except Exception as e:
        return False, f"Failed to check status: {e}"

# Use in monitoring
is_healthy, message = check_kibana_health(client)
if not is_healthy:
    # Send alert
    print(f"ALERT: {message}")

Monitoring Best Practices

Best practices for using the Status API:

  1. Regular health checks: Poll the status endpoint periodically

  2. Alert on degradation: Set up alerts for degraded or unavailable status

  3. Track metrics over time: Store statistics for trend analysis

  4. Monitor response times: Watch for increasing response times

  5. Check memory usage: Alert on high memory usage

import time

def monitor_kibana(client, interval=60):
    """Monitor Kibana health continuously."""
    while True:
        try:
            # Check status
            status = client.status.get_status()
            level = status.body["status"]["overall"]["level"]

            # Get stats
            stats = client.status.get_stats()
            uptime = stats.body["process"]["uptime_in_millis"]
            heap_used = stats.body["process"]["memory"]["heap"]["used_in_bytes"]

            print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}]")
            print(f"  Status: {level}")
            print(f"  Uptime: {uptime / 1000 / 60:.2f} minutes")
            print(f"  Heap: {heap_used / 1024 / 1024:.2f} MB")

            if level != "available":
                print(f"  WARNING: Kibana is {level}")

        except Exception as e:
            print(f"  ERROR: Failed to get status: {e}")

        time.sleep(interval)

Error Handling

Handle errors when checking status:

from kibana.exceptions import (
    ConnectionError,
    AuthenticationException,
    TransportError
)

try:
    status = client.status.get_status()
except AuthenticationException as e:
    print(f"Authentication failed: {e.message}")
except ConnectionError as e:
    print(f"Cannot connect to Kibana: {e}")
except TransportError as e:
    print(f"Transport error: {e}")
__init__(client)[source]

Initialize the StatusClient.

Parameters:

client (Kibana) – The parent Kibana client instance to delegate requests to.

Example

>>> status_client = StatusClient(kibana_client)
get_status()[source]

Get the current status of the Kibana server.

Returns comprehensive health information about the Kibana server and its dependencies. This endpoint is commonly used for health checks in monitoring systems and load balancers.

The response includes:
  • Overall status level (available, degraded, unavailable)

  • Individual service statuses (Elasticsearch, SavedObjects, etc.)

  • Version information (Kibana version, build number)

  • Server identification (name, UUID)

  • Detailed status messages for each service

Returns:

  • status.overall.level: Overall status level

  • status.overall.summary: Human-readable status summary

  • status.statuses: Dictionary of individual service statuses

  • version: Kibana version information

  • name: Server name

  • uuid: Server UUID

Return type:

ObjectApiResponse containing status information with the following structure

Raises:

Example

>>> # Basic status check
>>> status = client.status.get_status()
>>> if status.body["status"]["overall"]["level"] == "available":
...     print("Kibana is healthy")
... else:
...     print("Kibana has issues")
Kibana is healthy
>>>
>>> # Check specific service status
>>> es_status = status.body["status"]["statuses"]["elasticsearch"]
>>> print(f"Elasticsearch: {es_status['level']}")
>>> if es_status['level'] != 'available':
...     print(f"Issue: {es_status.get('summary', 'Unknown')}")
Elasticsearch: available
>>>
>>> # Get version information
>>> version = status.body["version"]
>>> print(f"Kibana {version['number']} (build {version['build_number']})")
Kibana 8.11.0 (build 12345)
get_stats()[source]

Get operational statistics about the Kibana server.

Returns detailed performance and resource utilization metrics for the Kibana server. This is useful for monitoring, capacity planning, and performance troubleshooting.

The response includes:
  • Process metrics (memory usage, uptime, event loop delay)

  • OS metrics (platform, CPU load, memory, uptime)

  • HTTP metrics (response times, request counts, connections)

  • Request statistics (total, disconnects, status codes)

Returns:

  • process: Process-level metrics (memory, uptime, event loop)

  • os: Operating system metrics (platform, load, memory)

  • response_times: HTTP response time statistics

  • requests: Request count statistics

  • concurrent_connections: Current connection count

Return type:

ObjectApiResponse containing statistics with the following structure

Raises:

Example

>>> # Get server statistics
>>> stats = client.status.get_stats()
>>>
>>> # Check memory usage
>>> heap = stats.body["process"]["memory"]["heap"]
>>> used_mb = heap["used_in_bytes"] / (1024 * 1024)
>>> total_mb = heap["total_in_bytes"] / (1024 * 1024)
>>> print(f"Heap: {used_mb:.1f}MB / {total_mb:.1f}MB")
Heap: 245.3MB / 512.0MB
>>>
>>> # Check uptime
>>> uptime_sec = stats.body["process"]["uptime_in_millis"] / 1000
>>> uptime_hours = uptime_sec / 3600
>>> print(f"Uptime: {uptime_hours:.1f} hours")
Uptime: 24.5 hours
>>>
>>> # Check response times
>>> response_times = stats.body.get("response_times", {})
>>> if "avg_in_millis" in response_times:
...     print(f"Avg response time: {response_times['avg_in_millis']}ms")
Avg response time: 45ms
>>>
>>> # Check concurrent connections
>>> connections = stats.body.get("concurrent_connections", 0)
>>> print(f"Active connections: {connections}")
Active connections: 12
perform_request(method, path, *, params=None, headers=None, body=None)

Perform an HTTP request via the parent client with space context enhancement.

Parameters:
  • method (str) – HTTP method (GET, POST, PUT, DELETE, etc.)

  • path (str) – API endpoint path

  • params (dict[str, Any] | None) – Query parameters

  • headers (dict[str, str] | None) – Request headers

  • body (dict[str, Any] | None) – Request body

Returns:

API response

Raises:

ApiError – If the API returns an error response (enhanced with space context)

Return type:

ObjectApiResponse[Any]

AsyncStatusClient

Asynchronous version of the StatusClient for use with async/await syntax.

class kibana._async.client.status.AsyncStatusClient(client)[source]

Bases: AsyncNamespaceClient

Async client for Kibana Status API operations.

Usage

The AsyncStatusClient provides the same methods as StatusClient but all methods are async and must be awaited:

from kibana import AsyncKibana
import asyncio

async def main():
    async with AsyncKibana("http://localhost:5601") as client:
        # Get status (async)
        status = await client.status.get_status()
        print(f"Status: {status.body['status']['overall']['level']}")

        # Get stats (async)
        stats = await client.status.get_stats()
        print(f"Uptime: {stats.body['process']['uptime_in_millis']} ms")

asyncio.run(main())

Concurrent Monitoring

Monitor multiple Kibana instances concurrently:

import asyncio
from kibana import AsyncKibana

async def check_instance(url):
    """Check status of a single Kibana instance."""
    async with AsyncKibana(url) as client:
        status = await client.status.get_status()
        return {
            "url": url,
            "level": status.body["status"]["overall"]["level"]
        }

async def monitor_cluster():
    """Monitor multiple Kibana instances."""
    instances = [
        "http://kibana1:5601",
        "http://kibana2:5601",
        "http://kibana3:5601"
    ]

    # Check all instances concurrently
    results = await asyncio.gather(
        *[check_instance(url) for url in instances],
        return_exceptions=True
    )

    for result in results:
        if isinstance(result, Exception):
            print(f"Error: {result}")
        else:
            print(f"{result['url']}: {result['level']}")

asyncio.run(monitor_cluster())

Async Health Monitoring

Implement continuous async health monitoring:

import asyncio
from kibana import AsyncKibana

async def monitor_health(client, interval=60):
    """Monitor Kibana health continuously (async)."""
    while True:
        try:
            # Get status and stats concurrently
            status, stats = await asyncio.gather(
                client.status.get_status(),
                client.status.get_stats()
            )

            level = status.body["status"]["overall"]["level"]
            uptime = stats.body["process"]["uptime_in_millis"]

            print(f"Status: {level}, Uptime: {uptime / 1000:.2f}s")

            if level != "available":
                print(f"WARNING: Kibana is {level}")

        except Exception as e:
            print(f"ERROR: {e}")

        await asyncio.sleep(interval)

async def main():
    async with AsyncKibana("http://localhost:5601") as client:
        await monitor_health(client, interval=30)

asyncio.run(main())
__init__(client)[source]

Initialize the AsyncStatusClient.

Parameters:

client (AsyncKibana) – The parent AsyncKibana client instance

async get_status()[source]

Get the current status of the Kibana server.

Returns information about the Kibana server status including: - Overall status level (available, degraded, unavailable) - Individual service statuses (Elasticsearch, SavedObjects, etc.) - Version information - Server name and UUID

Returns:

ObjectApiResponse containing status information

Return type:

ObjectApiResponse[dict[str, Any]]

async get_stats()[source]

Get operational statistics about the Kibana server.

Returns detailed metrics including: - Process information (memory usage, uptime) - OS information (platform, load, memory) - Response times and request counts - Concurrent connections

Returns:

ObjectApiResponse containing statistics

Return type:

ObjectApiResponse[dict[str, Any]]

async perform_request(method, path, *, params=None, headers=None, body=None)

Perform an async HTTP request via the parent client with space context enhancement.

Parameters:
  • method (str) – HTTP method (GET, POST, PUT, DELETE, etc.)

  • path (str) – API endpoint path

  • params (dict[str, Any] | None) – Query parameters

  • headers (dict[str, str] | None) – Request headers

  • body (dict[str, Any] | None) – Request body

Returns:

API response

Raises:

ApiError – If the API returns an error response (enhanced with space context)

Return type:

ObjectApiResponse[Any]