Advanced Usage

This guide covers advanced features and patterns for using the Kibana Python client effectively in production environments.

Per-Request Configuration

The options() method allows you to modify client settings for specific requests without creating a new client instance. This is useful for one-off configuration changes or when different operations require different settings.

Request Timeout

Override the default timeout for specific requests:

from kibana import Kibana

client = Kibana(
    "http://localhost:5601",
    api_key="your_api_key",
    request_timeout=10.0  # Default 10 seconds
)

# Use longer timeout for potentially slow operation
result = client.options(request_timeout=60.0).actions.get_all()

# Use shorter timeout for quick health check
status = client.options(request_timeout=5.0).status.get_status()

Custom Headers

Add custom headers for specific requests:

# Add custom headers for request tracking
result = client.options(
    headers={
        "X-Request-ID": "unique-request-id",
        "X-Custom-Header": "custom-value"
    }
).actions.get_all()

# Add headers for debugging
result = client.options(
    headers={"X-Debug": "true"}
).spaces.get_all()

Per-Request Authentication

Override authentication for specific requests:

# Initialize with default authentication
client = Kibana(
    "http://localhost:5601",
    api_key="default_api_key"
)

# Use different API key for specific request
result = client.options(
    api_key="admin_api_key"
).actions.create(
    name="Admin Connector",
    connector_type_id=".index",
    config={"index": "admin-logs"}
)

# Use basic auth for specific request
result = client.options(
    basic_auth=("admin", "admin_password")
).spaces.create(
    id="admin-space",
    name="Admin Space"
)

Chaining Options

Options can be chained for complex configurations:

# Combine multiple options
result = client.options(
    request_timeout=30.0,
    headers={"X-Request-ID": "123"}
).options(
    api_key="different_key"
).actions.get_all()

Note

Each call to options() creates a new client instance with the modified settings. The original client remains unchanged.

Connection Pooling

The Kibana client uses elastic-transport for connection management, which provides built-in connection pooling.

Connection Pool Configuration

from kibana import Kibana

client = Kibana(
    "http://localhost:5601",
    api_key="your_api_key",

    # Connection pool settings
    connections_per_node=10,  # Max connections per node (default: 10)
    max_retries=3,            # Max retry attempts (default: 3)
    retry_on_timeout=True,    # Retry on timeout (default: True)
)

Multiple Hosts

Configure multiple Kibana hosts for high availability:

client = Kibana(
    hosts=[
        "http://kibana1.example.com:5601",
        "http://kibana2.example.com:5601",
        "http://kibana3.example.com:5601"
    ],
    api_key="your_api_key"
)

The client will automatically distribute requests across available hosts and handle failover.

Connection Lifecycle

from kibana import Kibana

# Create client (connections are lazy-initialized)
client = Kibana("http://localhost:5601", api_key="your_api_key")

# Make requests (connections are created as needed)
status = client.status.get_status()

# Close client when done (releases connections)
client.close()

Using Context Managers

Context managers automatically handle connection lifecycle:

# Synchronous client
with Kibana("http://localhost:5601", api_key="your_api_key") as client:
    status = client.status.get_status()
    # Client is automatically closed when exiting the context

# Asynchronous client
async with AsyncKibana("http://localhost:5601", api_key="your_api_key") as client:
    status = await client.status.get_status()
    # Client is automatically closed when exiting the context

Retry Configuration

Configure retry behavior for failed requests:

Basic Retry Configuration

from kibana import Kibana

client = Kibana(
    "http://localhost:5601",
    api_key="your_api_key",

    # Retry settings
    max_retries=3,                    # Number of retry attempts
    retry_on_timeout=True,            # Retry on timeout errors
    retry_on_status=[502, 503, 504],  # Retry on these HTTP status codes
)

Custom Retry Logic

For more complex retry scenarios, implement custom retry logic:

import time
from kibana import Kibana
from kibana.exceptions import ApiError

def execute_with_retry(client, operation, max_attempts=3, backoff_factor=2):
    """Execute operation with exponential backoff retry."""
    for attempt in range(max_attempts):
        try:
            return operation()
        except ApiError as e:
            if attempt == max_attempts - 1:
                raise

            # Calculate backoff time
            wait_time = backoff_factor ** attempt
            print(f"Attempt {attempt + 1} failed, retrying in {wait_time}s...")
            time.sleep(wait_time)

# Usage
client = Kibana("http://localhost:5601", api_key="your_api_key")

result = execute_with_retry(
    client,
    lambda: client.actions.create(
        name="My Connector",
        connector_type_id=".index",
        config={"index": "logs"}
    )
)

TLS/SSL Configuration

Configure TLS/SSL settings for secure connections:

Basic TLS

from kibana import Kibana

client = Kibana(
    "https://kibana.example.com:5601",
    api_key="your_api_key",
    verify_certs=True  # Verify SSL certificates (default: True)
)

Custom CA Certificate

Provide a custom CA certificate bundle:

client = Kibana(
    "https://kibana.example.com:5601",
    api_key="your_api_key",
    ca_certs="/path/to/ca-bundle.crt"  # Path to CA certificate
)

Client Certificates

Use client certificates for mutual TLS authentication:

client = Kibana(
    "https://kibana.example.com:5601",
    api_key="your_api_key",
    client_cert="/path/to/client.crt",  # Client certificate
    client_key="/path/to/client.key"    # Client private key
)

Disable Certificate Verification

Warning

Only disable certificate verification for local development or testing. Never use this in production.

client = Kibana(
    "https://localhost:5601",
    api_key="your_api_key",
    verify_certs=False  # Disable certificate verification
)

SSL Context

For advanced SSL configuration, provide a custom SSL context:

import ssl
from kibana import Kibana

# Create custom SSL context
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE

client = Kibana(
    "https://localhost:5601",
    api_key="your_api_key",
    ssl_context=ssl_context
)

Async Client Patterns

The async client provides the same API as the synchronous client but with async/await support.

Basic Async Usage

import asyncio
from kibana import AsyncKibana

async def main():
    client = AsyncKibana(
        "http://localhost:5601",
        api_key="your_api_key"
    )

    try:
        # All methods are async
        status = await client.status.get_status()
        print(status.body)
    finally:
        await client.close()

asyncio.run(main())

Async Context Manager

async def main():
    async with AsyncKibana("http://localhost:5601", api_key="your_api_key") as client:
        status = await client.status.get_status()
        connectors = await client.actions.get_all()
        # Client is automatically closed

Concurrent Operations

Execute multiple operations concurrently:

import asyncio
from kibana import AsyncKibana

async def main():
    async with AsyncKibana("http://localhost:5601", api_key="your_api_key") as client:
        # Execute multiple operations concurrently
        status, spaces, connectors = await asyncio.gather(
            client.status.get_status(),
            client.spaces.get_all(),
            client.actions.get_all()
        )

        print(f"Status: {status.body['status']['overall']['level']}")
        print(f"Spaces: {len(spaces.body)}")
        print(f"Connectors: {len(connectors.body)}")

asyncio.run(main())

Concurrent Requests with Error Handling

async def main():
    async with AsyncKibana("http://localhost:5601", api_key="your_api_key") as client:
        # Execute with error handling
        results = await asyncio.gather(
            client.status.get_status(),
            client.spaces.get_all(),
            client.actions.get_all(),
            return_exceptions=True  # Don't fail on first exception
        )

        for i, result in enumerate(results):
            if isinstance(result, Exception):
                print(f"Operation {i} failed: {result}")
            else:
                print(f"Operation {i} succeeded")

Async Batch Processing

Process items in batches asynchronously:

async def process_connectors_batch(client, connector_ids):
    """Process a batch of connectors concurrently."""
    tasks = [
        client.actions.get(id=connector_id)
        for connector_id in connector_ids
    ]
    return await asyncio.gather(*tasks, return_exceptions=True)

async def main():
    async with AsyncKibana("http://localhost:5601", api_key="your_api_key") as client:
        # Get all connector IDs
        all_connectors = await client.actions.get_all()
        connector_ids = [c["id"] for c in all_connectors.body]

        # Process in batches of 10
        batch_size = 10
        for i in range(0, len(connector_ids), batch_size):
            batch = connector_ids[i:i + batch_size]
            results = await process_connectors_batch(client, batch)
            print(f"Processed batch {i//batch_size + 1}: {len(results)} connectors")

Async Rate Limiting

Implement rate limiting for async operations:

import asyncio
from asyncio import Semaphore

async def rate_limited_operation(client, semaphore, connector_id):
    """Execute operation with rate limiting."""
    async with semaphore:
        return await client.actions.get(id=connector_id)

async def main():
    async with AsyncKibana("http://localhost:5601", api_key="your_api_key") as client:
        # Limit to 5 concurrent requests
        semaphore = Semaphore(5)

        all_connectors = await client.actions.get_all()
        connector_ids = [c["id"] for c in all_connectors.body]

        # Execute with rate limiting
        tasks = [
            rate_limited_operation(client, semaphore, connector_id)
            for connector_id in connector_ids
        ]
        results = await asyncio.gather(*tasks)
        print(f"Processed {len(results)} connectors with rate limiting")

Performance Optimization

Connection Reuse

Reuse client instances to benefit from connection pooling:

# Good: Reuse client
client = Kibana("http://localhost:5601", api_key="your_api_key")

for i in range(100):
    result = client.actions.get_all()
    # Process result

client.close()

# Avoid: Creating new client for each request
for i in range(100):
    client = Kibana("http://localhost:5601", api_key="your_api_key")
    result = client.actions.get_all()
    client.close()  # Inefficient!

Batch Operations

Use bulk operations when available:

# Good: Bulk create
objects = [
    {"type": "config", "attributes": {"title": f"Config {i}"}}
    for i in range(100)
]
result = client.saved_objects.bulk_create(objects)

# Avoid: Individual creates
for i in range(100):
    client.saved_objects.create(
        type="config",
        attributes={"title": f"Config {i}"}
    )

Disable Space Validation

For performance-critical scenarios, disable automatic space validation:

# Create space-scoped client without validation
fast_client = client.space("marketing", validate=False)

# Validation is skipped for all operations
connector = fast_client.actions.create(
    name="Fast Connector",
    connector_type_id=".index",
    config={"index": "logs"}
)

Warning

Only disable validation when you’re certain the space exists. Invalid space IDs will result in API errors.

Caching

Implement caching for frequently accessed data:

from functools import lru_cache
from kibana import Kibana

class CachedKibanaClient:
    def __init__(self, client):
        self.client = client

    @lru_cache(maxsize=128)
    def get_connector(self, connector_id):
        """Get connector with caching."""
        response = self.client.actions.get(id=connector_id)
        return response.body

    @lru_cache(maxsize=32)
    def get_space(self, space_id):
        """Get space with caching."""
        response = self.client.spaces.get(id=space_id)
        return response.body

# Usage
client = Kibana("http://localhost:5601", api_key="your_api_key")
cached_client = CachedKibanaClient(client)

# First call hits API
connector = cached_client.get_connector("connector-id")

# Second call uses cache
connector = cached_client.get_connector("connector-id")  # Fast!

Async for I/O-Bound Operations

Use async client for I/O-bound operations:

import asyncio
from kibana import AsyncKibana

async def fetch_all_data(client):
    """Fetch multiple resources concurrently."""
    return await asyncio.gather(
        client.status.get_status(),
        client.spaces.get_all(),
        client.actions.get_all(),
        client.saved_objects.find(type="dashboard")
    )

async def main():
    async with AsyncKibana("http://localhost:5601", api_key="your_api_key") as client:
        # Much faster than sequential requests
        status, spaces, connectors, dashboards = await fetch_all_data(client)

Request Timeout Tuning

Tune timeouts based on operation type:

client = Kibana("http://localhost:5601", api_key="your_api_key")

# Fast operations: short timeout
status = client.options(request_timeout=5.0).status.get_status()

# Bulk operations: longer timeout
result = client.options(request_timeout=60.0).saved_objects.bulk_create(objects)

# Export operations: very long timeout
export = client.options(request_timeout=300.0).saved_objects.export(
    type=["dashboard", "visualization"]
)

Logging and Debugging

Enable Debug Logging

import logging
from kibana import Kibana

# Configure logging
logging.basicConfig(
    level=logging.DEBUG,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)

# The client uses the "kibana" logger
logger = logging.getLogger("kibana")
logger.setLevel(logging.DEBUG)

client = Kibana("http://localhost:5601", api_key="your_api_key")

# All requests will be logged with details
status = client.status.get_status()

Custom Logging Handler

import logging
from kibana import Kibana

class RequestLogger(logging.Handler):
    """Custom handler to log requests to a file."""

    def __init__(self, filename):
        super().__init__()
        self.file = open(filename, 'a')

    def emit(self, record):
        self.file.write(f"{record.getMessage()}\n")
        self.file.flush()

# Add custom handler
logger = logging.getLogger("kibana")
logger.addHandler(RequestLogger("kibana_requests.log"))
logger.setLevel(logging.DEBUG)

client = Kibana("http://localhost:5601", api_key="your_api_key")

Request/Response Inspection

from kibana import Kibana

client = Kibana("http://localhost:5601", api_key="your_api_key")

# Make request
response = client.actions.get_all()

# Inspect response metadata
print(f"Status: {response.meta.status}")
print(f"Headers: {response.meta.headers}")
print(f"Duration: {response.meta.duration}")

# Inspect response body
print(f"Body: {response.body}")

Error Handling Patterns

Retry with Exponential Backoff

import time
from kibana import Kibana
from kibana.exceptions import ApiError, ServerError

def retry_with_backoff(func, max_attempts=3, base_delay=1):
    """Retry function with exponential backoff."""
    for attempt in range(max_attempts):
        try:
            return func()
        except ServerError as e:
            if attempt == max_attempts - 1:
                raise

            delay = base_delay * (2 ** attempt)
            print(f"Server error, retrying in {delay}s...")
            time.sleep(delay)

client = Kibana("http://localhost:5601", api_key="your_api_key")

result = retry_with_backoff(
    lambda: client.actions.get_all()
)

Graceful Degradation

from kibana import Kibana
from kibana.exceptions import ApiError

client = Kibana("http://localhost:5601", api_key="your_api_key")

try:
    # Try to get connectors
    connectors = client.actions.get_all()
    print(f"Found {len(connectors.body)} connectors")
except ApiError as e:
    # Fall back to empty list
    print(f"Failed to fetch connectors: {e.message}")
    connectors = []

Circuit Breaker Pattern

import time
from kibana import Kibana
from kibana.exceptions import ApiError

class CircuitBreaker:
    """Simple circuit breaker implementation."""

    def __init__(self, failure_threshold=5, timeout=60):
        self.failure_threshold = failure_threshold
        self.timeout = timeout
        self.failures = 0
        self.last_failure_time = None
        self.state = "closed"  # closed, open, half-open

    def call(self, func):
        if self.state == "open":
            if time.time() - self.last_failure_time > self.timeout:
                self.state = "half-open"
            else:
                raise Exception("Circuit breaker is open")

        try:
            result = func()
            self.on_success()
            return result
        except ApiError as e:
            self.on_failure()
            raise

    def on_success(self):
        self.failures = 0
        self.state = "closed"

    def on_failure(self):
        self.failures += 1
        self.last_failure_time = time.time()
        if self.failures >= self.failure_threshold:
            self.state = "open"

# Usage
client = Kibana("http://localhost:5601", api_key="your_api_key")
breaker = CircuitBreaker()

try:
    result = breaker.call(lambda: client.actions.get_all())
except Exception as e:
    print(f"Request failed: {e}")

Best Practices

1. Use Context Managers

Always use context managers to ensure proper cleanup:

# Good
with Kibana("http://localhost:5601", api_key="your_api_key") as client:
    result = client.actions.get_all()

# Avoid
client = Kibana("http://localhost:5601", api_key="your_api_key")
result = client.actions.get_all()
# Forgot to call client.close()!

2. Reuse Client Instances

Create one client instance and reuse it:

# Good
client = Kibana("http://localhost:5601", api_key="your_api_key")
for i in range(100):
    result = client.actions.get_all()
client.close()

# Avoid
for i in range(100):
    client = Kibana("http://localhost:5601", api_key="your_api_key")
    result = client.actions.get_all()
    client.close()

3. Use Async for Concurrent Operations

Use async client when you need to perform multiple operations:

# Good: Concurrent async operations
async with AsyncKibana("http://localhost:5601", api_key="your_api_key") as client:
    results = await asyncio.gather(
        client.actions.get_all(),
        client.spaces.get_all(),
        client.status.get_status()
    )

# Avoid: Sequential sync operations
with Kibana("http://localhost:5601", api_key="your_api_key") as client:
    actions = client.actions.get_all()
    spaces = client.spaces.get_all()
    status = client.status.get_status()

4. Handle Errors Appropriately

Catch specific exceptions and handle them appropriately:

from kibana import Kibana
from kibana.exceptions import NotFoundError, AuthenticationException

with Kibana("http://localhost:5601", api_key="your_api_key") as client:
    try:
        connector = client.actions.get(id="connector-id")
    except NotFoundError:
        # Handle missing connector
        connector = client.actions.create(
            name="New Connector",
            connector_type_id=".index",
            config={"index": "logs"}
        )
    except AuthenticationException:
        # Handle auth failure
        print("Authentication failed, check credentials")
        raise

5. Configure Appropriate Timeouts

Set timeouts based on operation type:

client = Kibana(
    "http://localhost:5601",
    api_key="your_api_key",
    request_timeout=30.0  # Default timeout
)

# Short timeout for health checks
status = client.options(request_timeout=5.0).status.get_status()

# Long timeout for bulk operations
result = client.options(request_timeout=120.0).saved_objects.bulk_create(objects)

Next Steps