Production-Ready Go Client for Langfuse
Comprehensive LLM observability with enterprise-grade reliability, asynchronous processing, and minimal performance overhead.
package main
import (
"context"
"github.com/bdpiprava/GoLangfuse"
"github.com/bdpiprava/GoLangfuse/config"
)
func main() {
// Initialize from environment variables
lf, err := langfuse.NewFromEnv()
if err != nil {
panic(err)
}
defer lf.Shutdown(context.Background())
// Track LLM interactions
traceID := lf.AddEvent(ctx, &types.TraceEvent{
Name: "chat-completion",
UserID: "user-123",
Metadata: map[string]interface{}{
"model": "gpt-4",
},
})
}
Key Features
Asynchronous Processing
Non-blocking event tracking with configurable worker goroutines and intelligent batching.
Retry Logic
Built-in exponential backoff retry mechanism with configurable maximum attempts.
Comprehensive Monitoring
Health checks, metrics collection, and performance tracking out of the box.
Production Ready
Enterprise-grade reliability with structured error handling and graceful shutdown.
Type Safe
Strongly typed event system with builder patterns for clean, maintainable code.
Configurable
Extensive configuration options with sensible defaults and environment variable support.
Quick Start
1. Installation
go get github.com/bdpiprava/GoLangfuse
2. Environment Setup
Set your Langfuse credentials:
export LANGFUSE_URL="https://cloud.langfuse.com"
export LANGFUSE_PUBLIC_KEY="pk-lf-..."
export LANGFUSE_SECRET_KEY="sk-lf-..."
3. Basic Usage
package main
import (
"context"
"log"
"github.com/bdpiprava/GoLangfuse"
"github.com/bdpiprava/GoLangfuse/types"
)
func main() {
// Initialize client from environment
lf, err := langfuse.NewFromEnv()
if err != nil {
log.Fatal("Failed to initialize Langfuse:", err)
}
defer lf.Shutdown(context.Background())
ctx := context.Background()
// Create a trace for a user interaction
traceID := lf.AddEvent(ctx, &types.TraceEvent{
Name: "chat-completion",
UserID: "user-123",
Metadata: map[string]interface{}{
"session_id": "session-456",
"model": "gpt-4",
},
})
// Track LLM generation
lf.AddEvent(ctx, &types.GenerationEvent{
Name: "openai-completion",
TraceID: traceID,
Model: "gpt-4",
Input: map[string]interface{}{
"messages": []map[string]string{
{"role": "user", "content": "Hello, world!"},
},
},
Output: map[string]interface{}{
"content": "Hello! How can I help you today?",
},
Usage: &types.Usage{
InputTokens: 12,
OutputTokens: 8,
TotalTokens: 20,
},
Metadata: map[string]interface{}{
"temperature": 0.7,
},
})
log.Println("Events tracked successfully!")
}
API Reference
Core Interface
type Langfuse interface
Main interface for interacting with the Langfuse API.
type Langfuse interface {
AddEvent(ctx context.Context, event types.LangfuseEvent) *uuid.UUID
Stop(ctx context.Context) error
GetMetrics() Metrics
GetHealthStatus() HealthStatus
CheckHealth(ctx context.Context) HealthStatus
}
Factory Functions
NewFromEnv() (Langfuse, error)
Creates a new Langfuse client using environment variables.
Environment Variables:
LANGFUSE_URL
- Langfuse server URLLANGFUSE_PUBLIC_KEY
- Public API keyLANGFUSE_SECRET_KEY
- Secret API key
Returns:
Langfuse
- Configured client instanceerror
- Configuration or validation error
lf, err := langfuse.NewFromEnv()
if err != nil {
return fmt.Errorf("failed to initialize: %w", err)
}
New(config *config.Langfuse) Langfuse
Creates a new Langfuse client with explicit configuration.
Parameters:
config
- Configuration struct with all settings
cfg := &config.Langfuse{
URL: "https://cloud.langfuse.com",
PublicKey: "pk-lf-...",
SecretKey: "sk-lf-...",
BatchSize: 20,
BatchTimeout: 10 * time.Second,
}
lf := langfuse.New(cfg)
NewWithClient(config *config.Langfuse, client *http.Client) Langfuse
Creates a client with custom HTTP client for advanced networking requirements.
Parameters:
config
- Configuration structclient
- Custom HTTP client with desired settings
httpClient := &http.Client{
Timeout: 60 * time.Second,
Transport: &http.Transport{
MaxIdleConns: 200,
},
}
lf := langfuse.NewWithClient(cfg, httpClient)
Core Methods
AddEvent(ctx context.Context, event types.LangfuseEvent) *uuid.UUID
Asynchronously tracks an event in Langfuse.
Parameters:
ctx
- Context for cancellation and timeoutsevent
- Event implementing LangfuseEvent interface
Returns:
*uuid.UUID
- Event ID for tracking and correlation
eventID := lf.AddEvent(ctx, &types.TraceEvent{
Name: "user-interaction",
UserID: "user-123",
})
Stop(ctx context.Context) error
Gracefully shuts down the client, flushing all pending events.
Parameters:
ctx
- Context with timeout for shutdown operation
Returns:
error
- Shutdown error if operation fails
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if err := lf.Stop(ctx); err != nil {
log.Printf("Shutdown error: %v", err)
}
Monitoring Methods
GetMetrics() Metrics
Returns current performance metrics and statistics.
metrics := lf.GetMetrics()
fmt.Printf("Events processed: %d\n", metrics.EventsProcessed)
fmt.Printf("Success rate: %.2f%%\n", metrics.SuccessRate)
GetHealthStatus() HealthStatus
Returns current health status without performing checks.
health := lf.GetHealthStatus()
if health.Status != "healthy" {
log.Printf("Health issue: %s", health.Message)
}
CheckHealth(ctx context.Context) HealthStatus
Performs active health check against Langfuse API.
health := lf.CheckHealth(ctx)
fmt.Printf("API Status: %s\n", health.Status)
Configuration
Configuration Structure
type Langfuse struct {
// Authentication (Required)
URL string `envconfig:"LANGFUSE_URL" validate:"required"`
PublicKey string `envconfig:"LANGFUSE_PUBLIC_KEY" validate:"required"`
SecretKey string `envconfig:"LANGFUSE_SECRET_KEY" validate:"required"`
// Performance Tuning
NumberOfEventProcessor int `envconfig:"LANGFUSE_EVENT_PROCESSORS" default:"1"`
BatchSize int `envconfig:"LANGFUSE_BATCH_SIZE" default:"10"`
BatchTimeout time.Duration `envconfig:"LANGFUSE_BATCH_TIMEOUT" default:"5s"`
MaxRetries int `envconfig:"LANGFUSE_MAX_RETRIES" default:"3"`
// HTTP Settings
Timeout time.Duration `envconfig:"LANGFUSE_TIMEOUT" default:"30s"`
MaxIdleConns int `envconfig:"LANGFUSE_MAX_IDLE_CONNS" default:"100"`
MaxIdleConnsPerHost int `envconfig:"LANGFUSE_MAX_IDLE_CONNS_PER_HOST" default:"10"`
MaxConnsPerHost int `envconfig:"LANGFUSE_MAX_CONNS_PER_HOST" default:"100"`
IdleConnTimeout time.Duration `envconfig:"LANGFUSE_IDLE_CONN_TIMEOUT" default:"90s"`
// Debugging
Debug bool `envconfig:"LANGFUSE_DEBUG" default:"false"`
}
Configuration Options
Parameter | Environment Variable | Default | Description |
---|---|---|---|
URL |
LANGFUSE_URL |
Required | Langfuse server endpoint URL |
PublicKey |
LANGFUSE_PUBLIC_KEY |
Required | Public API key for authentication |
SecretKey |
LANGFUSE_SECRET_KEY |
Required | Secret API key for authentication |
NumberOfEventProcessor |
LANGFUSE_EVENT_PROCESSORS |
1 |
Number of concurrent worker goroutines |
BatchSize |
LANGFUSE_BATCH_SIZE |
10 |
Maximum events per batch request |
BatchTimeout |
LANGFUSE_BATCH_TIMEOUT |
5s |
Maximum time to wait before sending partial batch |
MaxRetries |
LANGFUSE_MAX_RETRIES |
3 |
Maximum retry attempts for failed requests |
Timeout |
LANGFUSE_TIMEOUT |
30s |
HTTP request timeout |
MaxIdleConns |
LANGFUSE_MAX_IDLE_CONNS |
100 |
Maximum idle HTTP connections |
Debug |
LANGFUSE_DEBUG |
false |
Enable debug logging |
Configuration Examples
Environment Variables
# Required
export LANGFUSE_URL="https://cloud.langfuse.com"
export LANGFUSE_PUBLIC_KEY="pk-lf-..."
export LANGFUSE_SECRET_KEY="sk-lf-..."
# Performance Tuning
export LANGFUSE_EVENT_PROCESSORS="3"
export LANGFUSE_BATCH_SIZE="20"
export LANGFUSE_BATCH_TIMEOUT="10s"
# High-Throughput Setup
export LANGFUSE_MAX_IDLE_CONNS="200"
export LANGFUSE_TIMEOUT="60s"
Programmatic Configuration
cfg := &config.Langfuse{
URL: "https://cloud.langfuse.com",
PublicKey: "pk-lf-...",
SecretKey: "sk-lf-...",
// Optimize for high throughput
NumberOfEventProcessor: 5,
BatchSize: 50,
BatchTimeout: 2 * time.Second,
// Production HTTP settings
Timeout: 60 * time.Second,
MaxIdleConns: 200,
MaxRetries: 5,
}
lf := langfuse.New(cfg)
Examples
Complete LLM Workflow
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/bdpiprava/GoLangfuse"
"github.com/bdpiprava/GoLangfuse/types"
)
func main() {
// Initialize client
lf, err := langfuse.NewFromEnv()
if err != nil {
log.Fatal("Failed to initialize:", err)
}
defer lf.Shutdown(context.Background())
ctx := context.Background()
// 1. Create a trace for the entire conversation
traceID := lf.AddEvent(ctx, &types.TraceEvent{
Name: "chat-conversation",
UserID: "user-123",
SessionID: "session-456",
Tags: []string{"chat", "customer-support"},
Metadata: map[string]interface{}{
"channel": "web",
"version": "1.0",
},
})
// 2. Track user input processing
spanID := lf.AddEvent(ctx, &types.SpanEvent{
Name: "input-processing",
TraceID: traceID,
Input: map[string]interface{}{
"user_message": "How do I reset my password?",
},
Metadata: map[string]interface{}{
"intent_detected": "password_reset",
"confidence": 0.95,
},
})
// 3. Track LLM generation
generationID := lf.AddEvent(ctx, &types.GenerationEvent{
Name: "openai-completion",
TraceID: traceID,
ParentID: spanID,
Model: "gpt-4",
ModelParameters: map[string]interface{}{
"temperature": 0.7,
"max_tokens": 500,
},
Input: map[string]interface{}{
"messages": []map[string]string{
{"role": "system", "content": "You are a helpful customer support assistant."},
{"role": "user", "content": "How do I reset my password?"},
},
},
Output: map[string]interface{}{
"content": "To reset your password, please follow these steps: 1. Go to the login page...",
},
Usage: &types.Usage{
InputTokens: 45,
OutputTokens: 120,
TotalTokens: 165,
},
StartTime: time.Now().Add(-2 * time.Second),
EndTime: time.Now(),
})
// 4. Track response quality
lf.AddEvent(ctx, &types.ScoreEvent{
Name: "response-quality",
TraceID: traceID,
ObservationID: generationID,
Value: 0.85,
Comment: "Helpful and accurate response",
Source: "user_feedback",
})
// 5. Update trace with final outcome
lf.AddEvent(ctx, &types.TraceEvent{
ID: traceID,
Output: map[string]interface{}{
"resolution_status": "resolved",
"user_satisfaction": "high",
},
EndTime: time.Now(),
})
fmt.Println("Complete workflow tracked successfully!")
}
Batch Processing
func processBatch(lf langfuse.Langfuse, requests []UserRequest) {
ctx := context.Background()
// Create a trace for the batch operation
batchTraceID := lf.AddEvent(ctx, &types.TraceEvent{
Name: "batch-processing",
Metadata: map[string]interface{}{
"batch_size": len(requests),
"timestamp": time.Now(),
},
})
for i, req := range requests {
// Create span for each request
spanID := lf.AddEvent(ctx, &types.SpanEvent{
Name: fmt.Sprintf("process-request-%d", i),
TraceID: batchTraceID,
Input: map[string]interface{}{
"request_id": req.ID,
"user_id": req.UserID,
},
})
// Process with LLM
result := processWithLLM(req)
// Track generation
lf.AddEvent(ctx, &types.GenerationEvent{
Name: "batch-generation",
TraceID: batchTraceID,
ParentID: spanID,
Model: "gpt-3.5-turbo",
Input: map[string]interface{}{"prompt": req.Prompt},
Output: map[string]interface{}{"result": result},
Usage: &types.Usage{
InputTokens: req.TokenCount,
OutputTokens: result.TokenCount,
},
})
}
}
Error Handling & Monitoring
func monitoredOperation(lf langfuse.Langfuse) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// Check health before operation
health := lf.CheckHealth(ctx)
if health.Status != "healthy" {
log.Printf("API health check failed: %s", health.Message)
return
}
// Track operation
traceID := lf.AddEvent(ctx, &types.TraceEvent{
Name: "monitored-operation",
})
// Simulate some work with error handling
err := performOperation()
if err != nil {
// Track error as a span
lf.AddEvent(ctx, &types.SpanEvent{
Name: "error-handling",
TraceID: traceID,
Level: types.LevelError,
StatusMessage: err.Error(),
Metadata: map[string]interface{}{
"error_type": "operation_failed",
"timestamp": time.Now(),
},
})
}
// Get current metrics
metrics := lf.GetMetrics()
log.Printf("Events processed: %d, Success rate: %.2f%%",
metrics.EventsProcessed, metrics.SuccessRate)
// Graceful shutdown with context
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 10*time.Second)
defer shutdownCancel()
if err := lf.Stop(shutdownCtx); err != nil {
log.Printf("Shutdown warning: %v", err)
}
}
Custom HTTP Client
func withCustomHTTPClient() {
// Create custom HTTP client for specific requirements
transport := &http.Transport{
MaxIdleConns: 200,
MaxIdleConnsPerHost: 20,
IdleConnTimeout: 120 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
// Custom proxy if needed
Proxy: http.ProxyFromEnvironment,
// Custom TLS config
TLSClientConfig: &tls.Config{
InsecureSkipVerify: false,
MinVersion: tls.VersionTLS12,
},
}
httpClient := &http.Client{
Transport: transport,
Timeout: 60 * time.Second,
}
cfg := &config.Langfuse{
URL: "https://cloud.langfuse.com",
PublicKey: os.Getenv("LANGFUSE_PUBLIC_KEY"),
SecretKey: os.Getenv("LANGFUSE_SECRET_KEY"),
BatchSize: 25,
}
lf := langfuse.NewWithClient(cfg, httpClient)
defer lf.Shutdown(context.Background())
// Use as normal
lf.AddEvent(context.Background(), &types.TraceEvent{
Name: "custom-client-trace",
})
}
Event Types
TraceEvent
Represents a complete user interaction or workflow.
type TraceEvent struct {
ID *uuid.UUID `json:"id,omitempty"`
Name string `json:"name" validate:"required"`
UserID string `json:"userId,omitempty"`
SessionID string `json:"sessionId,omitempty"`
Version string `json:"version,omitempty"`
Release string `json:"release,omitempty"`
Input map[string]interface{} `json:"input,omitempty"`
Output map[string]interface{} `json:"output,omitempty"`
Metadata map[string]interface{} `json:"metadata,omitempty"`
Tags []string `json:"tags,omitempty"`
Public bool `json:"public,omitempty"`
StartTime time.Time `json:"timestamp,omitempty"`
EndTime time.Time `json:"endTime,omitempty"`
}
GenerationEvent
Tracks LLM API calls with comprehensive metrics.
type GenerationEvent struct {
ID *uuid.UUID `json:"id,omitempty"`
TraceID *uuid.UUID `json:"traceId,omitempty"`
ParentID *uuid.UUID `json:"parentObservationId,omitempty"`
Name string `json:"name" validate:"required"`
StartTime time.Time `json:"startTime,omitempty"`
EndTime time.Time `json:"endTime,omitempty"`
CompletionTime time.Time `json:"completionStartTime,omitempty"`
Model string `json:"model,omitempty"`
ModelParameters map[string]interface{} `json:"modelParameters,omitempty"`
Input map[string]interface{} `json:"input,omitempty"`
Output map[string]interface{} `json:"output,omitempty"`
Usage *Usage `json:"usage,omitempty"`
Level Level `json:"level,omitempty"`
StatusMessage string `json:"statusMessage,omitempty"`
Version string `json:"version,omitempty"`
Metadata map[string]interface{} `json:"metadata,omitempty"`
PromptName string `json:"promptName,omitempty"`
PromptVersion string `json:"promptVersion,omitempty"`
}
SpanEvent
Tracks individual processing steps within a trace.
type SpanEvent struct {
ID *uuid.UUID `json:"id,omitempty"`
TraceID *uuid.UUID `json:"traceId,omitempty"`
ParentID *uuid.UUID `json:"parentObservationId,omitempty"`
Name string `json:"name" validate:"required"`
StartTime time.Time `json:"startTime,omitempty"`
EndTime time.Time `json:"endTime,omitempty"`
Input map[string]interface{} `json:"input,omitempty"`
Output map[string]interface{} `json:"output,omitempty"`
Level Level `json:"level,omitempty"`
StatusMessage string `json:"statusMessage,omitempty"`
Version string `json:"version,omitempty"`
Metadata map[string]interface{} `json:"metadata,omitempty"`
}
ScoreEvent
Tracks quality metrics and performance scores.
type ScoreEvent struct {
ID *uuid.UUID `json:"id,omitempty"`
TraceID *uuid.UUID `json:"traceId,omitempty"`
ObservationID *uuid.UUID `json:"observationId,omitempty"`
Name string `json:"name" validate:"required"`
Value float64 `json:"value" validate:"required"`
DataType DataType `json:"dataType,omitempty"`
Comment string `json:"comment,omitempty"`
Source string `json:"source,omitempty"`
ConfigID string `json:"configId,omitempty"`
}
Usage
Token usage and cost tracking.
type Usage struct {
InputTokens int `json:"input,omitempty"`
OutputTokens int `json:"output,omitempty"`
TotalTokens int `json:"total,omitempty"`
Unit string `json:"unit,omitempty"`
InputCost float64 `json:"inputCost,omitempty"`
OutputCost float64 `json:"outputCost,omitempty"`
TotalCost float64 `json:"totalCost,omitempty"`
}