Comprehensive Index Connector Example

File: examples/connector_management.py

This example demonstrates production-ready patterns for managing index connectors with comprehensive error handling, logging, and best practices.

Purpose

Perfect for:

  • Building production applications

  • Understanding advanced patterns

  • Implementing robust error handling

  • Managing connector lifecycle

What You’ll Learn

  • Class-based connector management

  • Comprehensive error handling

  • Update and delete operations

  • Batch document writing

  • Logging and observability integration

  • Production-ready patterns

Code Overview

The example uses a class-based approach for better organization:

IndexConnectorExample Class

class IndexConnectorExample:
    """Example class for managing index connectors."""

    def __init__(self):
        """Initialize with automatic Kibana client configuration."""
        self.index_name = "miconnectedindex"
        self.connector_name = f"Index Connector - {self.index_name}"
        self.connector_id = None
        self.client = create_kibana_client()

Benefits of Class-Based Approach:

  • Encapsulates connector state

  • Reusable methods

  • Easier testing

  • Better organization

Key Features

1. Robust Connector Creation

def create_index_connector(self) -> dict[str, Any]:
    """Create an index connector with error handling."""
    try:
        config = {
            "index": self.index_name,
            "refresh": True,
            "executionTimeField": "@timestamp",
        }

        response = self.client.actions.create(
            name=self.connector_name,
            connector_type_id=".index",
            config=config,
        )

        connector = response.body
        self.connector_id = connector["id"]
        logger.info(f"Created connector: {self.connector_id}")
        return connector

    except ConflictError:
        logger.warning(f"Connector '{self.connector_name}' already exists")
        return self._find_existing_connector()
    except BadRequestError as e:
        logger.error(f"Invalid configuration: {e}")
        raise
    except KibanaException as e:
        logger.error(f"Failed to create connector: {e}")
        raise

Key Features:

  • Handles ConflictError by finding existing connector

  • Logs all operations

  • Provides detailed error messages

  • Returns connector data for further use

2. Document Execution

def execute_connector(self, document: dict[str, Any]) -> dict[str, Any]:
    """Execute connector to write a document."""
    if not self.connector_id:
        raise ValueError("Connector not created")

    # Add timestamp if not present
    if "@timestamp" not in document:
        document["@timestamp"] = datetime.utcnow().isoformat()

    params = {"documents": [document]}

    try:
        response = self.client.actions.execute(
            id=self.connector_id,
            params=params
        )
        result = response.body
        logger.info("Document written successfully")
        return result

    except BadRequestError as e:
        logger.error(f"Invalid parameters: {e}")
        raise
    except KibanaException as e:
        logger.error(f"Execution failed: {e}")
        raise

Key Features:

  • Validates connector exists

  • Automatically adds timestamp

  • Comprehensive error handling

  • Logs execution results

3. Batch Document Writing

def write_sample_data(self) -> None:
    """Write multiple sample documents."""
    sample_documents = [
        {
            "message": "Application started",
            "level": "INFO",
            "service": "web-server",
        },
        {
            "message": "Database connected",
            "level": "INFO",
            "service": "database",
        },
        {
            "message": "High memory usage",
            "level": "WARNING",
            "service": "monitoring",
        },
        {
            "message": "Authentication failed",
            "level": "ERROR",
            "service": "auth-service",
        },
    ]

    for i, doc in enumerate(sample_documents, 1):
        try:
            self.execute_connector(doc)
            logger.info(f"Document {i}/{len(sample_documents)} written")
        except Exception as e:
            logger.error(f"Failed to write document {i}: {e}")

Key Features:

  • Writes multiple documents

  • Continues on individual failures

  • Tracks progress

  • Logs each operation

4. Connector Updates

def update_connector(self, new_config: dict[str, Any]) -> dict[str, Any]:
    """Update connector configuration."""
    if not self.connector_id:
        raise ValueError("Connector not created")

    try:
        response = self.client.actions.update(
            id=self.connector_id,
            name=self.connector_name,
            config=new_config
        )
        result = response.body
        logger.info("Connector updated successfully")
        return result
    except KibanaException as e:
        logger.error(f"Update failed: {e}")
        raise

Key Features:

  • Updates connector configuration

  • Validates connector exists

  • Error handling

  • Logging

5. Connector Deletion

def delete_connector(self) -> None:
    """Delete the connector."""
    if not self.connector_id:
        raise ValueError("Connector not created")

    try:
        self.client.actions.delete(id=self.connector_id)
        logger.info("Connector deleted")
        self.connector_id = None
    except NotFoundError:
        logger.warning("Connector not found (may already be deleted)")
        self.connector_id = None
    except KibanaException as e:
        logger.error(f"Deletion failed: {e}")
        raise

Key Features:

  • Handles already-deleted connectors

  • Clears connector ID on success

  • Comprehensive error handling

Production Patterns

Pattern 1: Singleton Connector

Reuse a single connector across your application:

class ConnectorManager:
    _instance = None
    _connector_id = None

    def __new__(cls):
        if cls._instance is None:
            cls._instance = super().__new__(cls)
        return cls._instance

    def get_connector_id(self):
        if not self._connector_id:
            self._connector_id = self._create_connector()
        return self._connector_id

Pattern 2: Connection Pooling

Reuse the Kibana client:

class ConnectorPool:
    def __init__(self, max_connections=10):
        self.client = Kibana(
            "http://localhost:5601",
            connections_per_node=max_connections
        )

    def write_document(self, connector_id, document):
        return self.client.actions.execute(
            id=connector_id,
            params={"documents": [document]}
        )

Pattern 3: Retry Logic

Handle transient failures:

from tenacity import retry, stop_after_attempt, wait_exponential

class ResilientConnector:
    @retry(
        stop=stop_after_attempt(3),
        wait=wait_exponential(multiplier=1, min=2, max=10)
    )
    def execute_with_retry(self, connector_id, document):
        return self.client.actions.execute(
            id=connector_id,
            params={"documents": [document]}
        )

Pattern 4: Async Batch Writing

Write documents asynchronously:

import asyncio
from kibana import AsyncKibana

class AsyncConnectorWriter:
    async def write_batch(self, connector_id, documents):
        async with AsyncKibana("http://localhost:5601") as client:
            tasks = [
                client.actions.execute(
                    id=connector_id,
                    params={"documents": [doc]}
                )
                for doc in documents
            ]
            return await asyncio.gather(*tasks)

Error Handling Strategies

Strategy 1: Graceful Degradation

Continue operation even if some writes fail:

def write_documents_gracefully(self, documents):
    """Write documents with graceful degradation."""
    successful = 0
    failed = 0

    for doc in documents:
        try:
            self.execute_connector(doc)
            successful += 1
        except Exception as e:
            logger.error(f"Failed to write document: {e}")
            failed += 1

    logger.info(f"Wrote {successful} documents, {failed} failed")
    return successful, failed

Strategy 2: Circuit Breaker

Stop trying after repeated failures:

class CircuitBreaker:
    def __init__(self, failure_threshold=5):
        self.failure_count = 0
        self.failure_threshold = failure_threshold
        self.is_open = False

    def call(self, func, *args, **kwargs):
        if self.is_open:
            raise Exception("Circuit breaker is open")

        try:
            result = func(*args, **kwargs)
            self.failure_count = 0
            return result
        except Exception as e:
            self.failure_count += 1
            if self.failure_count >= self.failure_threshold:
                self.is_open = True
            raise

Strategy 3: Dead Letter Queue

Store failed writes for later retry:

class DeadLetterQueue:
    def __init__(self):
        self.failed_documents = []

    def write_with_dlq(self, connector_id, document):
        try:
            return self.client.actions.execute(
                id=connector_id,
                params={"documents": [document]}
            )
        except Exception as e:
            logger.error(f"Write failed, adding to DLQ: {e}")
            self.failed_documents.append({
                "document": document,
                "error": str(e),
                "timestamp": datetime.utcnow()
            })

    def retry_failed(self, connector_id):
        """Retry all failed documents."""
        for item in self.failed_documents[:]:
            try:
                self.client.actions.execute(
                    id=connector_id,
                    params={"documents": [item["document"]]}
                )
                self.failed_documents.remove(item)
            except Exception:
                pass

Logging and Observability

Structured Logging

import logging

logger = logging.getLogger(__name__)

# Log with structured data
logger.info(
    "Connector created",
    extra={
        "connector_id": connector_id,
        "connector_type": ".index",
        "target_index": "my-index",
        "operation": "create"
    }
)

OpenTelemetry Integration

from kibana import configure_opentelemetry

# Enable tracing
configure_opentelemetry(
    enabled=True,
    service_name="my-app",
    exporter="otlp",
    endpoint="http://localhost:4317"
)

# All connector operations are now traced
connector = client.actions.create(...)

Metrics Collection

from prometheus_client import Counter, Histogram

documents_written = Counter(
    'connector_documents_written_total',
    'Total documents written via connector'
)

write_duration = Histogram(
    'connector_write_duration_seconds',
    'Time spent writing documents'
)

@write_duration.time()
def write_document(self, document):
    result = self.execute_connector(document)
    documents_written.inc()
    return result

Running the Example

# Basic usage
python examples/connector_management.py

# With custom configuration
export KIBANA_URL="http://localhost:5601"
export KIBANA_API_KEY="your_api_key"
python examples/connector_management.py

# With debug logging
export LOG_LEVEL=DEBUG
python examples/connector_management.py

Expected Output

📊 Kibana Configuration:
   URL: http://localhost:5601
   Auth: Basic authentication

=== Kibana Index Connector Example ===

1. Available connector types:
✓ Found 15 connector types
  - .email: Email
  - .index: Index (Enabled: True)
  - .server-log: Server log
  ...

2. Existing connectors:
✓ Found 2 connectors
  - Production Connector (conn-123)
  - Test Connector (conn-456)

3. Creating index connector...
Created connector: Index Connector - miconnectedindex (ID: conn-789)
Target index: miconnectedindex

4. Connector details:
{
  "id": "conn-789",
  "name": "Index Connector - miconnectedindex",
  "connector_type_id": ".index",
  "config": {
    "index": "miconnectedindex",
    "refresh": true,
    "executionTimeField": "@timestamp"
  }
}

5. Writing sample data...
Document 1/4 written successfully
Document 2/4 written successfully
Document 3/4 written successfully
Document 4/4 written successfully

6. Updating connector configuration...
Connector updated successfully

7. Writing document with updated configuration...

=== Example completed successfully! ===
Check your Elasticsearch index 'miconnectedindex' for the written documents.

Connector 'Index Connector - miconnectedindex' was created for this example.
Delete the connector? (y/N):

Best Practices

1. Resource Management

Always clean up resources:

try:
    example = IndexConnectorExample()
    example.create_index_connector()
    # Use connector
finally:
    if example.connector_id:
        example.delete_connector()
    example.close()

2. Configuration Management

Use environment variables:

import os

config = {
    "index": os.getenv("TARGET_INDEX", "default-index"),
    "refresh": os.getenv("REFRESH_INDEX", "true").lower() == "true",
    "executionTimeField": os.getenv("TIMESTAMP_FIELD", "@timestamp")
}

3. Error Recovery

Implement retry logic for transient failures:

max_retries = 3
for attempt in range(max_retries):
    try:
        result = example.execute_connector(document)
        break
    except Exception as e:
        if attempt == max_retries - 1:
            raise
        logger.warning(f"Attempt {attempt + 1} failed, retrying...")
        time.sleep(2 ** attempt)

4. Monitoring

Track connector health:

def check_connector_health(self):
    """Verify connector is operational."""
    try:
        connector = self.get_connector_info()
        if connector.get('is_missing_secrets'):
            logger.error("Connector has missing secrets")
            return False
        if connector.get('is_deprecated'):
            logger.warning("Connector type is deprecated")
        return True
    except Exception as e:
        logger.error(f"Health check failed: {e}")
        return False

Next Steps

  1. Adapt for your use case: Modify the class for your specific needs

  2. Add monitoring: Integrate with your observability stack

  3. Implement retries: Add resilience for production

  4. Scale horizontally: Use multiple connectors for high throughput

Key Takeaways

Class-based design: Better organization and reusability

Comprehensive error handling: Handle all failure scenarios

Logging: Track all operations for debugging

Production patterns: Retry logic, circuit breakers, DLQ

Observability: Integration with tracing and metrics