Comprehensive Index Connector Example¶
File: examples/connector_management.py
This example demonstrates production-ready patterns for managing index connectors with comprehensive error handling, logging, and best practices.
Purpose¶
Perfect for:
Building production applications
Understanding advanced patterns
Implementing robust error handling
Managing connector lifecycle
What You’ll Learn¶
Class-based connector management
Comprehensive error handling
Update and delete operations
Batch document writing
Logging and observability integration
Production-ready patterns
Code Overview¶
The example uses a class-based approach for better organization:
IndexConnectorExample Class¶
class IndexConnectorExample:
"""Example class for managing index connectors."""
def __init__(self):
"""Initialize with automatic Kibana client configuration."""
self.index_name = "miconnectedindex"
self.connector_name = f"Index Connector - {self.index_name}"
self.connector_id = None
self.client = create_kibana_client()
Benefits of Class-Based Approach:
Encapsulates connector state
Reusable methods
Easier testing
Better organization
Key Features¶
1. Robust Connector Creation¶
def create_index_connector(self) -> dict[str, Any]:
"""Create an index connector with error handling."""
try:
config = {
"index": self.index_name,
"refresh": True,
"executionTimeField": "@timestamp",
}
response = self.client.actions.create(
name=self.connector_name,
connector_type_id=".index",
config=config,
)
connector = response.body
self.connector_id = connector["id"]
logger.info(f"Created connector: {self.connector_id}")
return connector
except ConflictError:
logger.warning(f"Connector '{self.connector_name}' already exists")
return self._find_existing_connector()
except BadRequestError as e:
logger.error(f"Invalid configuration: {e}")
raise
except KibanaException as e:
logger.error(f"Failed to create connector: {e}")
raise
Key Features:
Handles
ConflictErrorby finding existing connectorLogs all operations
Provides detailed error messages
Returns connector data for further use
2. Document Execution¶
def execute_connector(self, document: dict[str, Any]) -> dict[str, Any]:
"""Execute connector to write a document."""
if not self.connector_id:
raise ValueError("Connector not created")
# Add timestamp if not present
if "@timestamp" not in document:
document["@timestamp"] = datetime.utcnow().isoformat()
params = {"documents": [document]}
try:
response = self.client.actions.execute(
id=self.connector_id,
params=params
)
result = response.body
logger.info("Document written successfully")
return result
except BadRequestError as e:
logger.error(f"Invalid parameters: {e}")
raise
except KibanaException as e:
logger.error(f"Execution failed: {e}")
raise
Key Features:
Validates connector exists
Automatically adds timestamp
Comprehensive error handling
Logs execution results
3. Batch Document Writing¶
def write_sample_data(self) -> None:
"""Write multiple sample documents."""
sample_documents = [
{
"message": "Application started",
"level": "INFO",
"service": "web-server",
},
{
"message": "Database connected",
"level": "INFO",
"service": "database",
},
{
"message": "High memory usage",
"level": "WARNING",
"service": "monitoring",
},
{
"message": "Authentication failed",
"level": "ERROR",
"service": "auth-service",
},
]
for i, doc in enumerate(sample_documents, 1):
try:
self.execute_connector(doc)
logger.info(f"Document {i}/{len(sample_documents)} written")
except Exception as e:
logger.error(f"Failed to write document {i}: {e}")
Key Features:
Writes multiple documents
Continues on individual failures
Tracks progress
Logs each operation
4. Connector Updates¶
def update_connector(self, new_config: dict[str, Any]) -> dict[str, Any]:
"""Update connector configuration."""
if not self.connector_id:
raise ValueError("Connector not created")
try:
response = self.client.actions.update(
id=self.connector_id,
name=self.connector_name,
config=new_config
)
result = response.body
logger.info("Connector updated successfully")
return result
except KibanaException as e:
logger.error(f"Update failed: {e}")
raise
Key Features:
Updates connector configuration
Validates connector exists
Error handling
Logging
5. Connector Deletion¶
def delete_connector(self) -> None:
"""Delete the connector."""
if not self.connector_id:
raise ValueError("Connector not created")
try:
self.client.actions.delete(id=self.connector_id)
logger.info("Connector deleted")
self.connector_id = None
except NotFoundError:
logger.warning("Connector not found (may already be deleted)")
self.connector_id = None
except KibanaException as e:
logger.error(f"Deletion failed: {e}")
raise
Key Features:
Handles already-deleted connectors
Clears connector ID on success
Comprehensive error handling
Production Patterns¶
Pattern 1: Singleton Connector¶
Reuse a single connector across your application:
class ConnectorManager:
_instance = None
_connector_id = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def get_connector_id(self):
if not self._connector_id:
self._connector_id = self._create_connector()
return self._connector_id
Pattern 2: Connection Pooling¶
Reuse the Kibana client:
class ConnectorPool:
def __init__(self, max_connections=10):
self.client = Kibana(
"http://localhost:5601",
connections_per_node=max_connections
)
def write_document(self, connector_id, document):
return self.client.actions.execute(
id=connector_id,
params={"documents": [document]}
)
Pattern 3: Retry Logic¶
Handle transient failures:
from tenacity import retry, stop_after_attempt, wait_exponential
class ResilientConnector:
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10)
)
def execute_with_retry(self, connector_id, document):
return self.client.actions.execute(
id=connector_id,
params={"documents": [document]}
)
Pattern 4: Async Batch Writing¶
Write documents asynchronously:
import asyncio
from kibana import AsyncKibana
class AsyncConnectorWriter:
async def write_batch(self, connector_id, documents):
async with AsyncKibana("http://localhost:5601") as client:
tasks = [
client.actions.execute(
id=connector_id,
params={"documents": [doc]}
)
for doc in documents
]
return await asyncio.gather(*tasks)
Error Handling Strategies¶
Strategy 1: Graceful Degradation¶
Continue operation even if some writes fail:
def write_documents_gracefully(self, documents):
"""Write documents with graceful degradation."""
successful = 0
failed = 0
for doc in documents:
try:
self.execute_connector(doc)
successful += 1
except Exception as e:
logger.error(f"Failed to write document: {e}")
failed += 1
logger.info(f"Wrote {successful} documents, {failed} failed")
return successful, failed
Strategy 2: Circuit Breaker¶
Stop trying after repeated failures:
class CircuitBreaker:
def __init__(self, failure_threshold=5):
self.failure_count = 0
self.failure_threshold = failure_threshold
self.is_open = False
def call(self, func, *args, **kwargs):
if self.is_open:
raise Exception("Circuit breaker is open")
try:
result = func(*args, **kwargs)
self.failure_count = 0
return result
except Exception as e:
self.failure_count += 1
if self.failure_count >= self.failure_threshold:
self.is_open = True
raise
Strategy 3: Dead Letter Queue¶
Store failed writes for later retry:
class DeadLetterQueue:
def __init__(self):
self.failed_documents = []
def write_with_dlq(self, connector_id, document):
try:
return self.client.actions.execute(
id=connector_id,
params={"documents": [document]}
)
except Exception as e:
logger.error(f"Write failed, adding to DLQ: {e}")
self.failed_documents.append({
"document": document,
"error": str(e),
"timestamp": datetime.utcnow()
})
def retry_failed(self, connector_id):
"""Retry all failed documents."""
for item in self.failed_documents[:]:
try:
self.client.actions.execute(
id=connector_id,
params={"documents": [item["document"]]}
)
self.failed_documents.remove(item)
except Exception:
pass
Logging and Observability¶
Structured Logging¶
import logging
logger = logging.getLogger(__name__)
# Log with structured data
logger.info(
"Connector created",
extra={
"connector_id": connector_id,
"connector_type": ".index",
"target_index": "my-index",
"operation": "create"
}
)
OpenTelemetry Integration¶
from kibana import configure_opentelemetry
# Enable tracing
configure_opentelemetry(
enabled=True,
service_name="my-app",
exporter="otlp",
endpoint="http://localhost:4317"
)
# All connector operations are now traced
connector = client.actions.create(...)
Metrics Collection¶
from prometheus_client import Counter, Histogram
documents_written = Counter(
'connector_documents_written_total',
'Total documents written via connector'
)
write_duration = Histogram(
'connector_write_duration_seconds',
'Time spent writing documents'
)
@write_duration.time()
def write_document(self, document):
result = self.execute_connector(document)
documents_written.inc()
return result
Running the Example¶
# Basic usage
python examples/connector_management.py
# With custom configuration
export KIBANA_URL="http://localhost:5601"
export KIBANA_API_KEY="your_api_key"
python examples/connector_management.py
# With debug logging
export LOG_LEVEL=DEBUG
python examples/connector_management.py
Expected Output¶
📊 Kibana Configuration:
URL: http://localhost:5601
Auth: Basic authentication
=== Kibana Index Connector Example ===
1. Available connector types:
✓ Found 15 connector types
- .email: Email
- .index: Index (Enabled: True)
- .server-log: Server log
...
2. Existing connectors:
✓ Found 2 connectors
- Production Connector (conn-123)
- Test Connector (conn-456)
3. Creating index connector...
Created connector: Index Connector - miconnectedindex (ID: conn-789)
Target index: miconnectedindex
4. Connector details:
{
"id": "conn-789",
"name": "Index Connector - miconnectedindex",
"connector_type_id": ".index",
"config": {
"index": "miconnectedindex",
"refresh": true,
"executionTimeField": "@timestamp"
}
}
5. Writing sample data...
Document 1/4 written successfully
Document 2/4 written successfully
Document 3/4 written successfully
Document 4/4 written successfully
6. Updating connector configuration...
Connector updated successfully
7. Writing document with updated configuration...
=== Example completed successfully! ===
Check your Elasticsearch index 'miconnectedindex' for the written documents.
Connector 'Index Connector - miconnectedindex' was created for this example.
Delete the connector? (y/N):
Best Practices¶
1. Resource Management¶
Always clean up resources:
try:
example = IndexConnectorExample()
example.create_index_connector()
# Use connector
finally:
if example.connector_id:
example.delete_connector()
example.close()
2. Configuration Management¶
Use environment variables:
import os
config = {
"index": os.getenv("TARGET_INDEX", "default-index"),
"refresh": os.getenv("REFRESH_INDEX", "true").lower() == "true",
"executionTimeField": os.getenv("TIMESTAMP_FIELD", "@timestamp")
}
3. Error Recovery¶
Implement retry logic for transient failures:
max_retries = 3
for attempt in range(max_retries):
try:
result = example.execute_connector(document)
break
except Exception as e:
if attempt == max_retries - 1:
raise
logger.warning(f"Attempt {attempt + 1} failed, retrying...")
time.sleep(2 ** attempt)
4. Monitoring¶
Track connector health:
def check_connector_health(self):
"""Verify connector is operational."""
try:
connector = self.get_connector_info()
if connector.get('is_missing_secrets'):
logger.error("Connector has missing secrets")
return False
if connector.get('is_deprecated'):
logger.warning("Connector type is deprecated")
return True
except Exception as e:
logger.error(f"Health check failed: {e}")
return False
Next Steps¶
Adapt for your use case: Modify the class for your specific needs
Add monitoring: Integrate with your observability stack
Implement retries: Add resilience for production
Scale horizontally: Use multiple connectors for high throughput
Key Takeaways¶
✅ Class-based design: Better organization and reusability
✅ Comprehensive error handling: Handle all failure scenarios
✅ Logging: Track all operations for debugging
✅ Production patterns: Retry logic, circuit breakers, DLQ
✅ Observability: Integration with tracing and metrics