Advanced Usage¶
This guide covers advanced features and patterns for using the Kibana Python client effectively in production environments.
Per-Request Configuration¶
The options() method allows you to modify client settings for specific requests without creating a new client instance. This is useful for one-off configuration changes or when different operations require different settings.
Request Timeout¶
Override the default timeout for specific requests:
from kibana import Kibana
client = Kibana(
"http://localhost:5601",
api_key="your_api_key",
request_timeout=10.0 # Default 10 seconds
)
# Use longer timeout for potentially slow operation
result = client.options(request_timeout=60.0).actions.get_all()
# Use shorter timeout for quick health check
status = client.options(request_timeout=5.0).status.get_status()
Custom Headers¶
Add custom headers for specific requests:
# Add custom headers for request tracking
result = client.options(
headers={
"X-Request-ID": "unique-request-id",
"X-Custom-Header": "custom-value"
}
).actions.get_all()
# Add headers for debugging
result = client.options(
headers={"X-Debug": "true"}
).spaces.get_all()
Per-Request Authentication¶
Override authentication for specific requests:
# Initialize with default authentication
client = Kibana(
"http://localhost:5601",
api_key="default_api_key"
)
# Use different API key for specific request
result = client.options(
api_key="admin_api_key"
).actions.create(
name="Admin Connector",
connector_type_id=".index",
config={"index": "admin-logs"}
)
# Use basic auth for specific request
result = client.options(
basic_auth=("admin", "admin_password")
).spaces.create(
id="admin-space",
name="Admin Space"
)
Chaining Options¶
Options can be chained for complex configurations:
# Combine multiple options
result = client.options(
request_timeout=30.0,
headers={"X-Request-ID": "123"}
).options(
api_key="different_key"
).actions.get_all()
Note
Each call to options() creates a new client instance with the modified settings. The original client remains unchanged.
Connection Pooling¶
The Kibana client uses elastic-transport for connection management, which provides built-in connection pooling.
Connection Pool Configuration¶
from kibana import Kibana
client = Kibana(
"http://localhost:5601",
api_key="your_api_key",
# Connection pool settings
connections_per_node=10, # Max connections per node (default: 10)
max_retries=3, # Max retry attempts (default: 3)
retry_on_timeout=True, # Retry on timeout (default: True)
)
Multiple Hosts¶
Configure multiple Kibana hosts for high availability:
client = Kibana(
hosts=[
"http://kibana1.example.com:5601",
"http://kibana2.example.com:5601",
"http://kibana3.example.com:5601"
],
api_key="your_api_key"
)
The client will automatically distribute requests across available hosts and handle failover.
Connection Lifecycle¶
from kibana import Kibana
# Create client (connections are lazy-initialized)
client = Kibana("http://localhost:5601", api_key="your_api_key")
# Make requests (connections are created as needed)
status = client.status.get_status()
# Close client when done (releases connections)
client.close()
Using Context Managers¶
Context managers automatically handle connection lifecycle:
# Synchronous client
with Kibana("http://localhost:5601", api_key="your_api_key") as client:
status = client.status.get_status()
# Client is automatically closed when exiting the context
# Asynchronous client
async with AsyncKibana("http://localhost:5601", api_key="your_api_key") as client:
status = await client.status.get_status()
# Client is automatically closed when exiting the context
Retry Configuration¶
Configure retry behavior for failed requests:
Basic Retry Configuration¶
from kibana import Kibana
client = Kibana(
"http://localhost:5601",
api_key="your_api_key",
# Retry settings
max_retries=3, # Number of retry attempts
retry_on_timeout=True, # Retry on timeout errors
retry_on_status=[502, 503, 504], # Retry on these HTTP status codes
)
Custom Retry Logic¶
For more complex retry scenarios, implement custom retry logic:
import time
from kibana import Kibana
from kibana.exceptions import ApiError
def execute_with_retry(client, operation, max_attempts=3, backoff_factor=2):
"""Execute operation with exponential backoff retry."""
for attempt in range(max_attempts):
try:
return operation()
except ApiError as e:
if attempt == max_attempts - 1:
raise
# Calculate backoff time
wait_time = backoff_factor ** attempt
print(f"Attempt {attempt + 1} failed, retrying in {wait_time}s...")
time.sleep(wait_time)
# Usage
client = Kibana("http://localhost:5601", api_key="your_api_key")
result = execute_with_retry(
client,
lambda: client.actions.create(
name="My Connector",
connector_type_id=".index",
config={"index": "logs"}
)
)
TLS/SSL Configuration¶
Configure TLS/SSL settings for secure connections:
Basic TLS¶
from kibana import Kibana
client = Kibana(
"https://kibana.example.com:5601",
api_key="your_api_key",
verify_certs=True # Verify SSL certificates (default: True)
)
Custom CA Certificate¶
Provide a custom CA certificate bundle:
client = Kibana(
"https://kibana.example.com:5601",
api_key="your_api_key",
ca_certs="/path/to/ca-bundle.crt" # Path to CA certificate
)
Client Certificates¶
Use client certificates for mutual TLS authentication:
client = Kibana(
"https://kibana.example.com:5601",
api_key="your_api_key",
client_cert="/path/to/client.crt", # Client certificate
client_key="/path/to/client.key" # Client private key
)
Disable Certificate Verification¶
Warning
Only disable certificate verification for local development or testing. Never use this in production.
client = Kibana(
"https://localhost:5601",
api_key="your_api_key",
verify_certs=False # Disable certificate verification
)
SSL Context¶
For advanced SSL configuration, provide a custom SSL context:
import ssl
from kibana import Kibana
# Create custom SSL context
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
client = Kibana(
"https://localhost:5601",
api_key="your_api_key",
ssl_context=ssl_context
)
Async Client Patterns¶
The async client provides the same API as the synchronous client but with async/await support.
Basic Async Usage¶
import asyncio
from kibana import AsyncKibana
async def main():
client = AsyncKibana(
"http://localhost:5601",
api_key="your_api_key"
)
try:
# All methods are async
status = await client.status.get_status()
print(status.body)
finally:
await client.close()
asyncio.run(main())
Async Context Manager¶
async def main():
async with AsyncKibana("http://localhost:5601", api_key="your_api_key") as client:
status = await client.status.get_status()
connectors = await client.actions.get_all()
# Client is automatically closed
Concurrent Operations¶
Execute multiple operations concurrently:
import asyncio
from kibana import AsyncKibana
async def main():
async with AsyncKibana("http://localhost:5601", api_key="your_api_key") as client:
# Execute multiple operations concurrently
status, spaces, connectors = await asyncio.gather(
client.status.get_status(),
client.spaces.get_all(),
client.actions.get_all()
)
print(f"Status: {status.body['status']['overall']['level']}")
print(f"Spaces: {len(spaces.body)}")
print(f"Connectors: {len(connectors.body)}")
asyncio.run(main())
Concurrent Requests with Error Handling¶
async def main():
async with AsyncKibana("http://localhost:5601", api_key="your_api_key") as client:
# Execute with error handling
results = await asyncio.gather(
client.status.get_status(),
client.spaces.get_all(),
client.actions.get_all(),
return_exceptions=True # Don't fail on first exception
)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Operation {i} failed: {result}")
else:
print(f"Operation {i} succeeded")
Async Batch Processing¶
Process items in batches asynchronously:
async def process_connectors_batch(client, connector_ids):
"""Process a batch of connectors concurrently."""
tasks = [
client.actions.get(id=connector_id)
for connector_id in connector_ids
]
return await asyncio.gather(*tasks, return_exceptions=True)
async def main():
async with AsyncKibana("http://localhost:5601", api_key="your_api_key") as client:
# Get all connector IDs
all_connectors = await client.actions.get_all()
connector_ids = [c["id"] for c in all_connectors.body]
# Process in batches of 10
batch_size = 10
for i in range(0, len(connector_ids), batch_size):
batch = connector_ids[i:i + batch_size]
results = await process_connectors_batch(client, batch)
print(f"Processed batch {i//batch_size + 1}: {len(results)} connectors")
Async Rate Limiting¶
Implement rate limiting for async operations:
import asyncio
from asyncio import Semaphore
async def rate_limited_operation(client, semaphore, connector_id):
"""Execute operation with rate limiting."""
async with semaphore:
return await client.actions.get(id=connector_id)
async def main():
async with AsyncKibana("http://localhost:5601", api_key="your_api_key") as client:
# Limit to 5 concurrent requests
semaphore = Semaphore(5)
all_connectors = await client.actions.get_all()
connector_ids = [c["id"] for c in all_connectors.body]
# Execute with rate limiting
tasks = [
rate_limited_operation(client, semaphore, connector_id)
for connector_id in connector_ids
]
results = await asyncio.gather(*tasks)
print(f"Processed {len(results)} connectors with rate limiting")
Performance Optimization¶
Connection Reuse¶
Reuse client instances to benefit from connection pooling:
# Good: Reuse client
client = Kibana("http://localhost:5601", api_key="your_api_key")
for i in range(100):
result = client.actions.get_all()
# Process result
client.close()
# Avoid: Creating new client for each request
for i in range(100):
client = Kibana("http://localhost:5601", api_key="your_api_key")
result = client.actions.get_all()
client.close() # Inefficient!
Batch Operations¶
Use bulk operations when available:
# Good: Bulk create
objects = [
{"type": "config", "attributes": {"title": f"Config {i}"}}
for i in range(100)
]
result = client.saved_objects.bulk_create(objects)
# Avoid: Individual creates
for i in range(100):
client.saved_objects.create(
type="config",
attributes={"title": f"Config {i}"}
)
Disable Space Validation¶
For performance-critical scenarios, disable automatic space validation:
# Create space-scoped client without validation
fast_client = client.space("marketing", validate=False)
# Validation is skipped for all operations
connector = fast_client.actions.create(
name="Fast Connector",
connector_type_id=".index",
config={"index": "logs"}
)
Warning
Only disable validation when you’re certain the space exists. Invalid space IDs will result in API errors.
Caching¶
Implement caching for frequently accessed data:
from functools import lru_cache
from kibana import Kibana
class CachedKibanaClient:
def __init__(self, client):
self.client = client
@lru_cache(maxsize=128)
def get_connector(self, connector_id):
"""Get connector with caching."""
response = self.client.actions.get(id=connector_id)
return response.body
@lru_cache(maxsize=32)
def get_space(self, space_id):
"""Get space with caching."""
response = self.client.spaces.get(id=space_id)
return response.body
# Usage
client = Kibana("http://localhost:5601", api_key="your_api_key")
cached_client = CachedKibanaClient(client)
# First call hits API
connector = cached_client.get_connector("connector-id")
# Second call uses cache
connector = cached_client.get_connector("connector-id") # Fast!
Async for I/O-Bound Operations¶
Use async client for I/O-bound operations:
import asyncio
from kibana import AsyncKibana
async def fetch_all_data(client):
"""Fetch multiple resources concurrently."""
return await asyncio.gather(
client.status.get_status(),
client.spaces.get_all(),
client.actions.get_all(),
client.saved_objects.find(type="dashboard")
)
async def main():
async with AsyncKibana("http://localhost:5601", api_key="your_api_key") as client:
# Much faster than sequential requests
status, spaces, connectors, dashboards = await fetch_all_data(client)
Request Timeout Tuning¶
Tune timeouts based on operation type:
client = Kibana("http://localhost:5601", api_key="your_api_key")
# Fast operations: short timeout
status = client.options(request_timeout=5.0).status.get_status()
# Bulk operations: longer timeout
result = client.options(request_timeout=60.0).saved_objects.bulk_create(objects)
# Export operations: very long timeout
export = client.options(request_timeout=300.0).saved_objects.export(
type=["dashboard", "visualization"]
)
Logging and Debugging¶
Enable Debug Logging¶
import logging
from kibana import Kibana
# Configure logging
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# The client uses the "kibana" logger
logger = logging.getLogger("kibana")
logger.setLevel(logging.DEBUG)
client = Kibana("http://localhost:5601", api_key="your_api_key")
# All requests will be logged with details
status = client.status.get_status()
Custom Logging Handler¶
import logging
from kibana import Kibana
class RequestLogger(logging.Handler):
"""Custom handler to log requests to a file."""
def __init__(self, filename):
super().__init__()
self.file = open(filename, 'a')
def emit(self, record):
self.file.write(f"{record.getMessage()}\n")
self.file.flush()
# Add custom handler
logger = logging.getLogger("kibana")
logger.addHandler(RequestLogger("kibana_requests.log"))
logger.setLevel(logging.DEBUG)
client = Kibana("http://localhost:5601", api_key="your_api_key")
Request/Response Inspection¶
from kibana import Kibana
client = Kibana("http://localhost:5601", api_key="your_api_key")
# Make request
response = client.actions.get_all()
# Inspect response metadata
print(f"Status: {response.meta.status}")
print(f"Headers: {response.meta.headers}")
print(f"Duration: {response.meta.duration}")
# Inspect response body
print(f"Body: {response.body}")
Error Handling Patterns¶
Retry with Exponential Backoff¶
import time
from kibana import Kibana
from kibana.exceptions import ApiError, ServerError
def retry_with_backoff(func, max_attempts=3, base_delay=1):
"""Retry function with exponential backoff."""
for attempt in range(max_attempts):
try:
return func()
except ServerError as e:
if attempt == max_attempts - 1:
raise
delay = base_delay * (2 ** attempt)
print(f"Server error, retrying in {delay}s...")
time.sleep(delay)
client = Kibana("http://localhost:5601", api_key="your_api_key")
result = retry_with_backoff(
lambda: client.actions.get_all()
)
Graceful Degradation¶
from kibana import Kibana
from kibana.exceptions import ApiError
client = Kibana("http://localhost:5601", api_key="your_api_key")
try:
# Try to get connectors
connectors = client.actions.get_all()
print(f"Found {len(connectors.body)} connectors")
except ApiError as e:
# Fall back to empty list
print(f"Failed to fetch connectors: {e.message}")
connectors = []
Circuit Breaker Pattern¶
import time
from kibana import Kibana
from kibana.exceptions import ApiError
class CircuitBreaker:
"""Simple circuit breaker implementation."""
def __init__(self, failure_threshold=5, timeout=60):
self.failure_threshold = failure_threshold
self.timeout = timeout
self.failures = 0
self.last_failure_time = None
self.state = "closed" # closed, open, half-open
def call(self, func):
if self.state == "open":
if time.time() - self.last_failure_time > self.timeout:
self.state = "half-open"
else:
raise Exception("Circuit breaker is open")
try:
result = func()
self.on_success()
return result
except ApiError as e:
self.on_failure()
raise
def on_success(self):
self.failures = 0
self.state = "closed"
def on_failure(self):
self.failures += 1
self.last_failure_time = time.time()
if self.failures >= self.failure_threshold:
self.state = "open"
# Usage
client = Kibana("http://localhost:5601", api_key="your_api_key")
breaker = CircuitBreaker()
try:
result = breaker.call(lambda: client.actions.get_all())
except Exception as e:
print(f"Request failed: {e}")
Best Practices¶
1. Use Context Managers¶
Always use context managers to ensure proper cleanup:
# Good
with Kibana("http://localhost:5601", api_key="your_api_key") as client:
result = client.actions.get_all()
# Avoid
client = Kibana("http://localhost:5601", api_key="your_api_key")
result = client.actions.get_all()
# Forgot to call client.close()!
2. Reuse Client Instances¶
Create one client instance and reuse it:
# Good
client = Kibana("http://localhost:5601", api_key="your_api_key")
for i in range(100):
result = client.actions.get_all()
client.close()
# Avoid
for i in range(100):
client = Kibana("http://localhost:5601", api_key="your_api_key")
result = client.actions.get_all()
client.close()
3. Use Async for Concurrent Operations¶
Use async client when you need to perform multiple operations:
# Good: Concurrent async operations
async with AsyncKibana("http://localhost:5601", api_key="your_api_key") as client:
results = await asyncio.gather(
client.actions.get_all(),
client.spaces.get_all(),
client.status.get_status()
)
# Avoid: Sequential sync operations
with Kibana("http://localhost:5601", api_key="your_api_key") as client:
actions = client.actions.get_all()
spaces = client.spaces.get_all()
status = client.status.get_status()
4. Handle Errors Appropriately¶
Catch specific exceptions and handle them appropriately:
from kibana import Kibana
from kibana.exceptions import NotFoundError, AuthenticationException
with Kibana("http://localhost:5601", api_key="your_api_key") as client:
try:
connector = client.actions.get(id="connector-id")
except NotFoundError:
# Handle missing connector
connector = client.actions.create(
name="New Connector",
connector_type_id=".index",
config={"index": "logs"}
)
except AuthenticationException:
# Handle auth failure
print("Authentication failed, check credentials")
raise
5. Configure Appropriate Timeouts¶
Set timeouts based on operation type:
client = Kibana(
"http://localhost:5601",
api_key="your_api_key",
request_timeout=30.0 # Default timeout
)
# Short timeout for health checks
status = client.options(request_timeout=5.0).status.get_status()
# Long timeout for bulk operations
result = client.options(request_timeout=120.0).saved_objects.bulk_create(objects)
Next Steps¶
Learn about Observability for distributed tracing
Explore Error Handling for comprehensive error management
Check Examples for practical code samples
Review API Reference for detailed API documentation