讓我們討論一個重要問題:如果發生問題,我們該如何監控我們的服務?
一方面,我們有警報功能的Prometheus和提供儀表板等其他有用功能的Kibana。我們也知道如何收集日誌 — ELK堆棧是我們的首選解決方案。然而,僅僅進行簡單的記錄並不總是足夠的:它並不能提供對請求在整個組件生態系統中的旅程的整體視圖。
您可以在這裡找到有關ELK的更多信息。
但是,如果我們想要可視化請求呢?如果我們需要關聯系統之間傳輸的請求呢?這適用於微服務和單體應用 — 服務的數量多寡並不重要;重要的是我們如何管理它們的延遲。
事實上,每個用戶請求可能需要通過一整個獨立服務、數據庫、消息隊列和外部API的鏈條。
在這種複雜環境中,要精確找出延遲發生的位置、識別哪部分鏈條作為性能瓶頸以及在發生故障時快速找到故障原因變得非常困難。
為了有效應對這些挑戰,我們需要一個集中的、一致的系統來收集遙測數據 — 追蹤、指標和日誌。這就是OpenTelemetry和Jaeger派上用場的地方。
讓我們來看看基本概念
我們需要理解兩個主要術語:
追蹤 ID
追蹤 ID 是一個 16 字節的標識符,通常表示為一個 32 字符的十六進制字符串。它在追蹤開始時自動生成,並在特定請求創建的所有跨度中保持不變。這使得我們能夠輕鬆查看請求如何在系統中的不同服務或組件之間傳遞。
跨度 ID
每個追蹤中的個別操作都會獲得自己的跨度 ID,通常是一個隨機生成的 64 位值。跨度共享相同的追蹤 ID,但每個跨度都有一個獨特的跨度 ID,因此您可以精確確定每個跨度所代表的工作流程的哪一部分(例如數據庫查詢或對另一個微服務的調用)。
它們是如何相關的?
追蹤 ID 和 跨度 ID相輔相成。
當請求被啟動時,會生成一個追蹤 ID 並傳遞給所有相關服務。每個服務則創建一個與追蹤 ID 相關聯的唯一跨度 ID,讓您能夠可視化請求從開始到結束的完整生命週期。
那麼,為什麼不直接使用 Jaeger 呢?? 為什麼我們需要 OpenTelemetry (OTEL) 和它的所有規範呢?? 這是一個很好的問題!讓我們一步一步來分析。
在這裡了解更多關於 Jaeger 的資訊 這裡。
簡言之
- Jaeger是一個用於存儲和可視化分佈式跟蹤的系統。它收集、存儲、搜索和顯示數據,展示請求在您的服務中如何“傳播”。
- OpenTelemetry(OTEL)是一個標準(以及一組庫),用於從應用程序和基礎設施中收集遙測數據(跟蹤、指標、日誌)。它並不與任何單一可視化工具或後端綁定。
簡單來說:
- OTEL就像一種遙測收集的“通用語言”和庫集合。
- Jaeger是用於查看和分析分佈式跟蹤的後端和用戶界面。
為什麼我們需要 OTEL 如果我們已經有 Jaeger?
1. 集合的單一標準
過去存在著像 OpenTracing 和 OpenCensus 這樣的項目。OpenTelemetry 將這些收集指標和跟蹤的方法統一為一個通用標準。
2. 易於集成
您可以在 Go(或其他語言)中編寫代碼,添加 OTEL 库以自動注入攔截器和跨度,然後就完成了。之後,您想將數據發送到哪裡都沒有關係 — Jaeger、Tempo、Zipkin、Datadog、自定義後端 — OpenTelemetry 會處理好一切。您只需要更換導出器。
3. 不僅僅是跟蹤
OpenTelemetry 匶括跟蹤,但也處理指標和日誌。您最終將獲得一個用於所有遙測需求的單一工具組,而不僅僅是跟蹤。
4. Jaeger 作為後端
Jaeger 是一個出色的選擇,如果您主要關注分佈式追蹤可視化。但它不會自動提供跨語言儀器。另一方面,OpenTelemetry 為您提供了一種標準化的方式來收集數據,然後您可以決定將數據發送到哪裡(包括 Jaeger)。
實際上,它們經常一起工作:一起工作:
您的應用程序使用 OpenTelemetry → 通過 OTLP 協議進行通信 → 進入 OpenTelemetry 收集器(HTTP 或 grpc) → 導出到 Jaeger 進行可視化。
技術部分
系統設計(稍微)
讓我們快速勾勒出幾個服務,將執行以下操作:
- 購買服務 – 處理付款並在 MongoDB 中記錄它
- 具有 Debezium 的 CDC – 監聽 MongoDB 表中的更改並將其發送到 Kafka
- 購買處理器 – 從 Kafka 消費消息並調用 Auth 服務查找
user_id
進行驗證 - Auth 服務 – 簡單的用戶服務
總結:
- 3 個 Go 服務
- Kafka
- CDC(Debezium)
- MongoDB
代碼部分
讓我們從基礎設施開始。為了將所有內容結合成一個系統,我們將創建一個大型的 Docker Compose 文件。我們將從設置遙測開始。
注意:所有的代碼都可以通過文章末尾的鏈接獲取,包括基礎設施。
services
jaeger
image jaegertracing/all-in-one1.52
ports
"6831:6831/udp" # UDP port for the Jaeger agent
"16686:16686" # Web UI
"14268:14268" # HTTP port for spans
networks
internal
prometheus
image prom/prometheus latest
volumes
./prometheus.yml:/etc/prometheus/prometheus.yml:ro
ports
"9090:9090"
depends_on
kafka
jaeger
otel-collector
command
--config.file=/etc/prometheus/prometheus.yml
networks
internal
otel-collector
image otel/opentelemetry-collector-contrib0.91.0
command'--config=/etc/otel-collector.yaml'
ports
"4317:4317" # OTLP gRPC receiver
volumes
./otel-collector.yaml:/etc/otel-collector.yaml
depends_on
jaeger
networks
internal
我們還將配置收集器 – 這個組件負責收集遙測數據。
在這裡,我們選擇了使用 gRPC 進行數據傳輸,這意味著通信將在 HTTP/2 上進行:
receivers
# Add the OTLP receiver listening on port 4317.
otlp
protocols
grpc
endpoint"0.0.0.0:4317"
processors
batch
# https://github.com/open-telemetry/opentelemetry-collector/tree/main/processor/memorylimiterprocessor
memory_limiter
check_interval 1s
limit_percentage80
spike_limit_percentage15
extensions
health_check
exporters
otlp
endpoint"jaeger:4317"
tls
insecuretrue
prometheus
endpoint 0.0.0.09090
debug
verbosity detailed
service
extensions health_check
pipelines
traces
receivers otlp
processors memory_limiter batch
exporters otlp
metrics
receivers otlp
processors memory_limiter batch
exporters prometheus
確保根據需要調整任何地址,這樣基本配置就完成了。
我們已經知道 OpenTelemetry(OTEL)使用了兩個關鍵概念 – 追蹤 ID和跨度 ID – 這有助於在分佈式系統中跟踪和監控請求。
代碼實現
現在,讓我們看看如何在您的 Go 代碼中使其正常工作。我們需要以下導入:
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/sdk/resource"
"go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
然後,在應用程序啟動時,在main()
中添加一個初始化跟踪器的函數:
func InitTracer(ctx context.Context) func() {
exp, err := otlptrace.New(
ctx,
otlptracegrpc.NewClient(
otlptracegrpc.WithEndpoint(endpoint),
otlptracegrpc.WithInsecure(),
),
)
if err != nil {
log.Fatalf("failed to create OTLP trace exporter: %v", err)
}
res, err := resource.New(ctx,
resource.WithAttributes(
semconv.ServiceNameKey.String("auth-service"),
semconv.ServiceVersionKey.String("1.0.0"),
semconv.DeploymentEnvironmentKey.String("stg"),
),
)
if err != nil {
log.Fatalf("failed to create resource: %v", err)
}
tp := trace.NewTracerProvider(
trace.WithBatcher(exp),
trace.WithResource(res),
)
otel.SetTracerProvider(tp)
return func() {
err := tp.Shutdown(ctx)
if err != nil {
log.Printf("error shutting down tracer provider: %v", err)
}
}
}
有了跟踪設置,我們只需要在代碼中放置跨度來跟踪調用。例如,如果我們想測量數據庫調用(因為這通常是我們尋找性能問題的第一個地方),我們可以這樣寫:
tracer := otel.Tracer("auth-service")
ctx, span := tracer.Start(ctx, "GetUserInfo")
defer span.End()
tracedLogger := logging.AddTraceContextToLogger(ctx)
tracedLogger.Info("find user info",
zap.String("operation", "find user"),
zap.String("username", username),
)
user, err := s.userRepo.GetUserInfo(ctx, username)
if err != nil {
s.logger.Error(errNotFound)
span.RecordError(err)
span.SetStatus(otelCodes.Error, "Failed to fetch user info")
return nil, status.Errorf(grpcCodes.NotFound, errNotFound, err)
}
span.SetStatus(otelCodes.Ok, "User info retrieved successfully")
我們在服務層有追蹤 – 太好了!但我們還可以更深入,對數據庫層進行儀器化:
func (r *UserRepository) GetUserInfo(ctx context.Context, username string) (*models.User, error) {
tracer := otel.Tracer("auth-service")
ctx, span := tracer.Start(ctx, "UserRepository.GetUserInfo",
trace.WithAttributes(
attribute.String("db.statement", query),
attribute.String("db.user", username),
),
)
defer span.End()
var user models.User
// Some code that queries the DB...
// err := doDatabaseCall()
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, "Failed to execute query")
return nil, fmt.Errorf("failed to fetch user info: %w", err)
}
span.SetStatus(codes.Ok, "Query executed successfully")
return &user, nil
}
現在,我們可以完整地查看請求的過程。前往 Jaeger UI,在auth-service
下查詢最後的 20 條追蹤,您將看到所有跨度以及它們如何在一個地方連接。
現在一切都是明顯的。如果您需要,可以將整個查詢包含在標籤中。但請記住,不要使您的遙測負荷過重 — 故意添加數據。我只是展示了可能的情況,但通常不建議這樣包含完整查詢。
gRPC 客戶端-服務端
如果您想要查看跨兩個 gRPC 服務的追踪,這是非常簡單的。您只需要添加庫中提供的即用攔截器。例如,在服務端:
server := grpc.NewServer(
grpc.StatsHandler(otelgrpc.NewServerHandler()),
)
pb.RegisterAuthServiceServer(server, authService)
在客戶端,代碼同樣簡潔:
shutdown := tracing.InitTracer(ctx)
defer shutdown()
conn, err := grpc.Dial(
"auth-service:50051",
grpc.WithInsecure(),
grpc.WithStatsHandler(otelgrpc.NewClientHandler()),
)
if err != nil {
logger.Fatal("error", zap.Error(err))
}
就是這樣!確保您的導出器配置正確,當客戶端調用服務端時,您將看到一個單一的追踪 ID橫跨這些服務記錄下來。
處理 CDC 事件和追踪
想要處理 CDC 中的事件嗎?一個簡單的方法是將追踪 ID 嵌入 MongoDB 存儲的對象中。這樣,當 Debezium 捕獲更改並將其發送到 Kafka 時,追踪 ID 已經成為記錄的一部分。
例如,如果您正在使用 MongoDB,可以這樣做:
func (r *mongoPurchaseRepo) SavePurchase(ctx context.Context, purchase entity.Purchase) error {
span := r.handleTracing(ctx, purchase)
defer span.End()
// Insert the record into MongoDB, including the current span's Trace ID
_, err := r.collection.InsertOne(ctx, bson.M{
"_id": purchase.ID,
"user_id": purchase.UserID,
"username": purchase.Username,
"amount": purchase.Amount,
"currency": purchase.Currency,
"payment_method": purchase.PaymentMethod,
// ...
"trace_id": span.SpanContext().TraceID().String(),
})
return err
}
然後 Debezium 接收此對象(包括trace_id
)並將其發送到 Kafka。在消費者端,您只需解析傳入消息,提取trace_id
,並將其合併到您的追踪上下文中:
// If we find a Trace ID in the payload, attach it to the context
newCtx := ctx
if traceID != "" {
log.Printf("Found Trace ID: %s", traceID)
newCtx = context.WithValue(ctx, "trace-id", traceID)
}
// Create a new span
tracer := otel.Tracer("purchase-processor")
newCtx, span := tracer.Start(newCtx, "handler.processPayload")
defer span.End()
if traceID != "" {
span.SetAttributes(
attribute.String("trace.id", traceID),
)
}
// Parse the "after" field into a Purchase struct...
var purchase model.Purchase
if err := mapstructure.Decode(afterDoc, &purchase); err != nil {
log.Printf("Failed to map 'after' payload to Purchase struct: %v", err)
return err
}
// If we find a Trace ID in the payload, attach it to the context
newCtx := ctx
if traceID != "" {
log.Printf("Found Trace ID: %s", traceID)
newCtx = context.WithValue(ctx, "trace-id", traceID)
}
// Create a new span
tracer := otel.Tracer("purchase-processor")
newCtx, span := tracer.Start(newCtx, "handler.processPayload")
defer span.End()
if traceID != "" {
span.SetAttributes(
attribute.String("trace.id", traceID),
)
}
// Parse the "after" field into a Purchase struct...
var purchase model.Purchase
if err := mapstructure.Decode(afterDoc, &purchase); err != nil {
log.Printf("Failed to map 'after' payload to Purchase struct: %v", err)
return err
}
另一種方法:使用 Kafka Headers
有時,在 Kafka 標頭中存儲 Trace ID 可能比在有效負載本身中更容易。對於 CDC 工作流程,這可能無法直接使用 – Debezium 可能會限制要添加到標頭中的內容。但如果您控制生成者端(或者使用標準的 Kafka 生產者),您可以像這樣進行操作:
將 Trace ID 注入標頭中
// saramaHeadersCarrier is a helper to set/get headers in a Sarama message.
type saramaHeadersCarrier *[]sarama.RecordHeader
func (c saramaHeadersCarrier) Get(key string) string {
for _, h := range *c {
if string(h.Key) == key {
return string(h.Value)
}
}
return ""
}
func (c saramaHeadersCarrier) Set(key string, value string) {
*c = append(*c, sarama.RecordHeader{
Key: []byte(key),
Value: []byte(value),
})
}
// Before sending a message to Kafka:
func produceMessageWithTraceID(ctx context.Context, producer sarama.SyncProducer, topic string, value []byte) error {
span := trace.SpanFromContext(ctx)
traceID := span.SpanContext().TraceID().String()
headers := make([]sarama.RecordHeader, 0)
carrier := saramaHeadersCarrier(&headers)
carrier.Set("trace-id", traceID)
msg := &sarama.ProducerMessage{
Topic: topic,
Value: sarama.ByteEncoder(value),
Headers: headers,
}
_, _, err := producer.SendMessage(msg)
return err
}
在消費者端提取 Trace ID
for message := range claim.Messages() {
// Extract the trace ID from headers
var traceID string
for _, hdr := range message.Headers {
if string(hdr.Key) == "trace-id" {
traceID = string(hdr.Value)
}
}
// Now continue your normal tracing workflow
if traceID != "" {
log.Printf("Found Trace ID in headers: %s", traceID)
// Attach it to the context or create a new span with this info
}
}
根據您的用例和 CDC 管道設置的方式,您可以選擇最適合的方法:
- 將 Trace ID 嵌入數據庫記錄,這樣它就可以通過 CDC 自然流動。
- 使用 Kafka 標頭,如果您對生成者端有更多控制權,或者想要避免膨脹消息有效負載。
無論哪種方式,您都可以確保跨多個服務保持追踪的一致性 – 即使事件通過 Kafka 和 Debezium 非同步處理。
結論
使用 OpenTelemetry 和 Jaeger 可提供詳細的請求跟踪,幫助您準確找出分佈式系統中發生延遲的原因和位置。
添加 Prometheus 可通過指標來完整呈現整個情況 – 這些是性能和穩定性的關鍵指標。這些工具共同構成了一個全面的可觀察性堆棧,實現更快的問題檢測和解決、性能優化以及整體系統的可靠性。
我可以說,這種方法顯著加快了在微服務環境中進行故障排除的速度,這也是我們在項目中實施的第一步。
鏈接
Source:
https://dzone.com/articles/control-services-otel-jaeger-prometheus