Production-Ready Go Client for Langfuse

Comprehensive LLM observability with enterprise-grade reliability, asynchronous processing, and minimal performance overhead.

package main

import (
    "context"
    "github.com/bdpiprava/GoLangfuse"
    "github.com/bdpiprava/GoLangfuse/config"
)

func main() {
    // Initialize from environment variables
    lf, err := langfuse.NewFromEnv()
    if err != nil {
        panic(err)
    }
    defer lf.Shutdown(context.Background())

    // Track LLM interactions
    traceID := lf.AddEvent(ctx, &types.TraceEvent{
        Name: "chat-completion",
        UserID: "user-123",
        Metadata: map[string]interface{}{
            "model": "gpt-4",
        },
    })
}

Key Features

Asynchronous Processing

Non-blocking event tracking with configurable worker goroutines and intelligent batching.

🔄

Retry Logic

Built-in exponential backoff retry mechanism with configurable maximum attempts.

📊

Comprehensive Monitoring

Health checks, metrics collection, and performance tracking out of the box.

🛡️

Production Ready

Enterprise-grade reliability with structured error handling and graceful shutdown.

🎯

Type Safe

Strongly typed event system with builder patterns for clean, maintainable code.

⚙️

Configurable

Extensive configuration options with sensible defaults and environment variable support.

Quick Start

1. Installation

go get github.com/bdpiprava/GoLangfuse

2. Environment Setup

Set your Langfuse credentials:

export LANGFUSE_URL="https://cloud.langfuse.com"
export LANGFUSE_PUBLIC_KEY="pk-lf-..."
export LANGFUSE_SECRET_KEY="sk-lf-..."

3. Basic Usage

package main

import (
    "context"
    "log"
    
    "github.com/bdpiprava/GoLangfuse"
    "github.com/bdpiprava/GoLangfuse/types"
)

func main() {
    // Initialize client from environment
    lf, err := langfuse.NewFromEnv()
    if err != nil {
        log.Fatal("Failed to initialize Langfuse:", err)
    }
    defer lf.Shutdown(context.Background())

    ctx := context.Background()
    
    // Create a trace for a user interaction
    traceID := lf.AddEvent(ctx, &types.TraceEvent{
        Name:   "chat-completion",
        UserID: "user-123",
        Metadata: map[string]interface{}{
            "session_id": "session-456",
            "model":      "gpt-4",
        },
    })

    // Track LLM generation
    lf.AddEvent(ctx, &types.GenerationEvent{
        Name:    "openai-completion",
        TraceID: traceID,
        Model:   "gpt-4",
        Input: map[string]interface{}{
            "messages": []map[string]string{
                {"role": "user", "content": "Hello, world!"},
            },
        },
        Output: map[string]interface{}{
            "content": "Hello! How can I help you today?",
        },
        Usage: &types.Usage{
            InputTokens:  12,
            OutputTokens: 8,
            TotalTokens:  20,
        },
        Metadata: map[string]interface{}{
            "temperature": 0.7,
        },
    })

    log.Println("Events tracked successfully!")
}

API Reference

Core Interface

type Langfuse interface

Main interface for interacting with the Langfuse API.

type Langfuse interface {
    AddEvent(ctx context.Context, event types.LangfuseEvent) *uuid.UUID
    Stop(ctx context.Context) error
    GetMetrics() Metrics
    GetHealthStatus() HealthStatus
    CheckHealth(ctx context.Context) HealthStatus
}

Factory Functions

NewFromEnv() (Langfuse, error)

Creates a new Langfuse client using environment variables.

Environment Variables:
  • LANGFUSE_URL - Langfuse server URL
  • LANGFUSE_PUBLIC_KEY - Public API key
  • LANGFUSE_SECRET_KEY - Secret API key
Returns:
  • Langfuse - Configured client instance
  • error - Configuration or validation error
lf, err := langfuse.NewFromEnv()
if err != nil {
    return fmt.Errorf("failed to initialize: %w", err)
}

New(config *config.Langfuse) Langfuse

Creates a new Langfuse client with explicit configuration.

Parameters:
  • config - Configuration struct with all settings
cfg := &config.Langfuse{
    URL:       "https://cloud.langfuse.com",
    PublicKey: "pk-lf-...",
    SecretKey: "sk-lf-...",
    BatchSize: 20,
    BatchTimeout: 10 * time.Second,
}
lf := langfuse.New(cfg)

NewWithClient(config *config.Langfuse, client *http.Client) Langfuse

Creates a client with custom HTTP client for advanced networking requirements.

Parameters:
  • config - Configuration struct
  • client - Custom HTTP client with desired settings
httpClient := &http.Client{
    Timeout: 60 * time.Second,
    Transport: &http.Transport{
        MaxIdleConns: 200,
    },
}
lf := langfuse.NewWithClient(cfg, httpClient)

Core Methods

AddEvent(ctx context.Context, event types.LangfuseEvent) *uuid.UUID

Asynchronously tracks an event in Langfuse.

Parameters:
  • ctx - Context for cancellation and timeouts
  • event - Event implementing LangfuseEvent interface
Returns:
  • *uuid.UUID - Event ID for tracking and correlation
eventID := lf.AddEvent(ctx, &types.TraceEvent{
    Name: "user-interaction",
    UserID: "user-123",
})

Stop(ctx context.Context) error

Gracefully shuts down the client, flushing all pending events.

Parameters:
  • ctx - Context with timeout for shutdown operation
Returns:
  • error - Shutdown error if operation fails
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

if err := lf.Stop(ctx); err != nil {
    log.Printf("Shutdown error: %v", err)
}

Monitoring Methods

GetMetrics() Metrics

Returns current performance metrics and statistics.

metrics := lf.GetMetrics()
fmt.Printf("Events processed: %d\n", metrics.EventsProcessed)
fmt.Printf("Success rate: %.2f%%\n", metrics.SuccessRate)

GetHealthStatus() HealthStatus

Returns current health status without performing checks.

health := lf.GetHealthStatus()
if health.Status != "healthy" {
    log.Printf("Health issue: %s", health.Message)
}

CheckHealth(ctx context.Context) HealthStatus

Performs active health check against Langfuse API.

health := lf.CheckHealth(ctx)
fmt.Printf("API Status: %s\n", health.Status)

Configuration

Configuration Structure

type Langfuse struct {
    // Authentication (Required)
    URL       string `envconfig:"LANGFUSE_URL" validate:"required"`
    PublicKey string `envconfig:"LANGFUSE_PUBLIC_KEY" validate:"required"`
    SecretKey string `envconfig:"LANGFUSE_SECRET_KEY" validate:"required"`
    
    // Performance Tuning
    NumberOfEventProcessor int           `envconfig:"LANGFUSE_EVENT_PROCESSORS" default:"1"`
    BatchSize             int           `envconfig:"LANGFUSE_BATCH_SIZE" default:"10"`
    BatchTimeout          time.Duration `envconfig:"LANGFUSE_BATCH_TIMEOUT" default:"5s"`
    MaxRetries            int           `envconfig:"LANGFUSE_MAX_RETRIES" default:"3"`
    
    // HTTP Settings
    Timeout                time.Duration `envconfig:"LANGFUSE_TIMEOUT" default:"30s"`
    MaxIdleConns          int           `envconfig:"LANGFUSE_MAX_IDLE_CONNS" default:"100"`
    MaxIdleConnsPerHost   int           `envconfig:"LANGFUSE_MAX_IDLE_CONNS_PER_HOST" default:"10"`
    MaxConnsPerHost       int           `envconfig:"LANGFUSE_MAX_CONNS_PER_HOST" default:"100"`
    IdleConnTimeout       time.Duration `envconfig:"LANGFUSE_IDLE_CONN_TIMEOUT" default:"90s"`
    
    // Debugging
    Debug bool `envconfig:"LANGFUSE_DEBUG" default:"false"`
}

Configuration Options

Parameter Environment Variable Default Description
URL LANGFUSE_URL Required Langfuse server endpoint URL
PublicKey LANGFUSE_PUBLIC_KEY Required Public API key for authentication
SecretKey LANGFUSE_SECRET_KEY Required Secret API key for authentication
NumberOfEventProcessor LANGFUSE_EVENT_PROCESSORS 1 Number of concurrent worker goroutines
BatchSize LANGFUSE_BATCH_SIZE 10 Maximum events per batch request
BatchTimeout LANGFUSE_BATCH_TIMEOUT 5s Maximum time to wait before sending partial batch
MaxRetries LANGFUSE_MAX_RETRIES 3 Maximum retry attempts for failed requests
Timeout LANGFUSE_TIMEOUT 30s HTTP request timeout
MaxIdleConns LANGFUSE_MAX_IDLE_CONNS 100 Maximum idle HTTP connections
Debug LANGFUSE_DEBUG false Enable debug logging

Configuration Examples

Environment Variables

# Required
export LANGFUSE_URL="https://cloud.langfuse.com"
export LANGFUSE_PUBLIC_KEY="pk-lf-..."
export LANGFUSE_SECRET_KEY="sk-lf-..."

# Performance Tuning
export LANGFUSE_EVENT_PROCESSORS="3"
export LANGFUSE_BATCH_SIZE="20"
export LANGFUSE_BATCH_TIMEOUT="10s"

# High-Throughput Setup
export LANGFUSE_MAX_IDLE_CONNS="200"
export LANGFUSE_TIMEOUT="60s"

Programmatic Configuration

cfg := &config.Langfuse{
    URL:       "https://cloud.langfuse.com",
    PublicKey: "pk-lf-...",
    SecretKey: "sk-lf-...",
    
    // Optimize for high throughput
    NumberOfEventProcessor: 5,
    BatchSize:             50,
    BatchTimeout:          2 * time.Second,
    
    // Production HTTP settings
    Timeout:        60 * time.Second,
    MaxIdleConns:   200,
    MaxRetries:     5,
}

lf := langfuse.New(cfg)

Examples

Complete LLM Workflow

package main

import (
    "context"
    "fmt"
    "log"
    "time"
    
    "github.com/bdpiprava/GoLangfuse"
    "github.com/bdpiprava/GoLangfuse/types"
)

func main() {
    // Initialize client
    lf, err := langfuse.NewFromEnv()
    if err != nil {
        log.Fatal("Failed to initialize:", err)
    }
    defer lf.Shutdown(context.Background())

    ctx := context.Background()
    
    // 1. Create a trace for the entire conversation
    traceID := lf.AddEvent(ctx, &types.TraceEvent{
        Name:     "chat-conversation",
        UserID:   "user-123",
        SessionID: "session-456",
        Tags:     []string{"chat", "customer-support"},
        Metadata: map[string]interface{}{
            "channel": "web",
            "version": "1.0",
        },
    })

    // 2. Track user input processing
    spanID := lf.AddEvent(ctx, &types.SpanEvent{
        Name:    "input-processing",
        TraceID: traceID,
        Input: map[string]interface{}{
            "user_message": "How do I reset my password?",
        },
        Metadata: map[string]interface{}{
            "intent_detected": "password_reset",
            "confidence":      0.95,
        },
    })

    // 3. Track LLM generation
    generationID := lf.AddEvent(ctx, &types.GenerationEvent{
        Name:     "openai-completion",
        TraceID:  traceID,
        ParentID: spanID,
        Model:    "gpt-4",
        ModelParameters: map[string]interface{}{
            "temperature": 0.7,
            "max_tokens":  500,
        },
        Input: map[string]interface{}{
            "messages": []map[string]string{
                {"role": "system", "content": "You are a helpful customer support assistant."},
                {"role": "user", "content": "How do I reset my password?"},
            },
        },
        Output: map[string]interface{}{
            "content": "To reset your password, please follow these steps: 1. Go to the login page...",
        },
        Usage: &types.Usage{
            InputTokens:  45,
            OutputTokens: 120,
            TotalTokens:  165,
        },
        StartTime: time.Now().Add(-2 * time.Second),
        EndTime:   time.Now(),
    })

    // 4. Track response quality
    lf.AddEvent(ctx, &types.ScoreEvent{
        Name:         "response-quality",
        TraceID:      traceID,
        ObservationID: generationID,
        Value:        0.85,
        Comment:      "Helpful and accurate response",
        Source:       "user_feedback",
    })

    // 5. Update trace with final outcome
    lf.AddEvent(ctx, &types.TraceEvent{
        ID:     traceID,
        Output: map[string]interface{}{
            "resolution_status": "resolved",
            "user_satisfaction": "high",
        },
        EndTime: time.Now(),
    })

    fmt.Println("Complete workflow tracked successfully!")
}

Batch Processing

func processBatch(lf langfuse.Langfuse, requests []UserRequest) {
    ctx := context.Background()
    
    // Create a trace for the batch operation
    batchTraceID := lf.AddEvent(ctx, &types.TraceEvent{
        Name: "batch-processing",
        Metadata: map[string]interface{}{
            "batch_size": len(requests),
            "timestamp": time.Now(),
        },
    })

    for i, req := range requests {
        // Create span for each request
        spanID := lf.AddEvent(ctx, &types.SpanEvent{
            Name:    fmt.Sprintf("process-request-%d", i),
            TraceID: batchTraceID,
            Input: map[string]interface{}{
                "request_id": req.ID,
                "user_id":    req.UserID,
            },
        })

        // Process with LLM
        result := processWithLLM(req)
        
        // Track generation
        lf.AddEvent(ctx, &types.GenerationEvent{
            Name:     "batch-generation",
            TraceID:  batchTraceID,
            ParentID: spanID,
            Model:    "gpt-3.5-turbo",
            Input:    map[string]interface{}{"prompt": req.Prompt},
            Output:   map[string]interface{}{"result": result},
            Usage: &types.Usage{
                InputTokens:  req.TokenCount,
                OutputTokens: result.TokenCount,
            },
        })
    }
}

Error Handling & Monitoring

func monitoredOperation(lf langfuse.Langfuse) {
    ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
    defer cancel()

    // Check health before operation
    health := lf.CheckHealth(ctx)
    if health.Status != "healthy" {
        log.Printf("API health check failed: %s", health.Message)
        return
    }

    // Track operation
    traceID := lf.AddEvent(ctx, &types.TraceEvent{
        Name: "monitored-operation",
    })

    // Simulate some work with error handling
    err := performOperation()
    if err != nil {
        // Track error as a span
        lf.AddEvent(ctx, &types.SpanEvent{
            Name:    "error-handling",
            TraceID: traceID,
            Level:   types.LevelError,
            StatusMessage: err.Error(),
            Metadata: map[string]interface{}{
                "error_type": "operation_failed",
                "timestamp": time.Now(),
            },
        })
    }

    // Get current metrics
    metrics := lf.GetMetrics()
    log.Printf("Events processed: %d, Success rate: %.2f%%", 
        metrics.EventsProcessed, metrics.SuccessRate)
        
    // Graceful shutdown with context
    shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer shutdownCancel()
    
    if err := lf.Stop(shutdownCtx); err != nil {
        log.Printf("Shutdown warning: %v", err)
    }
}

Custom HTTP Client

func withCustomHTTPClient() {
    // Create custom HTTP client for specific requirements
    transport := &http.Transport{
        MaxIdleConns:        200,
        MaxIdleConnsPerHost: 20,
        IdleConnTimeout:     120 * time.Second,
        TLSHandshakeTimeout: 10 * time.Second,
        
        // Custom proxy if needed
        Proxy: http.ProxyFromEnvironment,
        
        // Custom TLS config
        TLSClientConfig: &tls.Config{
            InsecureSkipVerify: false,
            MinVersion:         tls.VersionTLS12,
        },
    }

    httpClient := &http.Client{
        Transport: transport,
        Timeout:   60 * time.Second,
    }

    cfg := &config.Langfuse{
        URL:       "https://cloud.langfuse.com",
        PublicKey: os.Getenv("LANGFUSE_PUBLIC_KEY"),
        SecretKey: os.Getenv("LANGFUSE_SECRET_KEY"),
        BatchSize: 25,
    }

    lf := langfuse.NewWithClient(cfg, httpClient)
    defer lf.Shutdown(context.Background())
    
    // Use as normal
    lf.AddEvent(context.Background(), &types.TraceEvent{
        Name: "custom-client-trace",
    })
}

Event Types

TraceEvent

Represents a complete user interaction or workflow.

type TraceEvent struct {
    ID        *uuid.UUID             `json:"id,omitempty"`
    Name      string                 `json:"name" validate:"required"`
    UserID    string                 `json:"userId,omitempty"`
    SessionID string                 `json:"sessionId,omitempty"`
    Version   string                 `json:"version,omitempty"`
    Release   string                 `json:"release,omitempty"`
    Input     map[string]interface{} `json:"input,omitempty"`
    Output    map[string]interface{} `json:"output,omitempty"`
    Metadata  map[string]interface{} `json:"metadata,omitempty"`
    Tags      []string               `json:"tags,omitempty"`
    Public    bool                   `json:"public,omitempty"`
    StartTime time.Time              `json:"timestamp,omitempty"`
    EndTime   time.Time              `json:"endTime,omitempty"`
}

GenerationEvent

Tracks LLM API calls with comprehensive metrics.

type GenerationEvent struct {
    ID              *uuid.UUID             `json:"id,omitempty"`
    TraceID         *uuid.UUID             `json:"traceId,omitempty"`
    ParentID        *uuid.UUID             `json:"parentObservationId,omitempty"`
    Name            string                 `json:"name" validate:"required"`
    StartTime       time.Time              `json:"startTime,omitempty"`
    EndTime         time.Time              `json:"endTime,omitempty"`
    CompletionTime  time.Time              `json:"completionStartTime,omitempty"`
    Model           string                 `json:"model,omitempty"`
    ModelParameters map[string]interface{} `json:"modelParameters,omitempty"`
    Input           map[string]interface{} `json:"input,omitempty"`
    Output          map[string]interface{} `json:"output,omitempty"`
    Usage           *Usage                 `json:"usage,omitempty"`
    Level           Level                  `json:"level,omitempty"`
    StatusMessage   string                 `json:"statusMessage,omitempty"`
    Version         string                 `json:"version,omitempty"`
    Metadata        map[string]interface{} `json:"metadata,omitempty"`
    PromptName      string                 `json:"promptName,omitempty"`
    PromptVersion   string                 `json:"promptVersion,omitempty"`
}

SpanEvent

Tracks individual processing steps within a trace.

type SpanEvent struct {
    ID            *uuid.UUID             `json:"id,omitempty"`
    TraceID       *uuid.UUID             `json:"traceId,omitempty"`
    ParentID      *uuid.UUID             `json:"parentObservationId,omitempty"`
    Name          string                 `json:"name" validate:"required"`
    StartTime     time.Time              `json:"startTime,omitempty"`
    EndTime       time.Time              `json:"endTime,omitempty"`
    Input         map[string]interface{} `json:"input,omitempty"`
    Output        map[string]interface{} `json:"output,omitempty"`
    Level         Level                  `json:"level,omitempty"`
    StatusMessage string                 `json:"statusMessage,omitempty"`
    Version       string                 `json:"version,omitempty"`
    Metadata      map[string]interface{} `json:"metadata,omitempty"`
}

ScoreEvent

Tracks quality metrics and performance scores.

type ScoreEvent struct {
    ID             *uuid.UUID `json:"id,omitempty"`
    TraceID        *uuid.UUID `json:"traceId,omitempty"`
    ObservationID  *uuid.UUID `json:"observationId,omitempty"`
    Name           string     `json:"name" validate:"required"`
    Value          float64    `json:"value" validate:"required"`
    DataType       DataType   `json:"dataType,omitempty"`
    Comment        string     `json:"comment,omitempty"`
    Source         string     `json:"source,omitempty"`
    ConfigID       string     `json:"configId,omitempty"`
}

Usage

Token usage and cost tracking.

type Usage struct {
    InputTokens        int     `json:"input,omitempty"`
    OutputTokens       int     `json:"output,omitempty"`
    TotalTokens        int     `json:"total,omitempty"`
    Unit               string  `json:"unit,omitempty"`
    InputCost          float64 `json:"inputCost,omitempty"`
    OutputCost         float64 `json:"outputCost,omitempty"`
    TotalCost          float64 `json:"totalCost,omitempty"`
}