Blog Post

How to propagate OpenTelemetry trace headers over AWS Kinesis: Part 2

Published
March 20, 2024
#
 mins read
By 

in this blog post

In the first article of our series, we explored the importance of trace headers and the complexities involved in their propagation. Now, we shift from theory to practice.

This second installment will take you through a hands-on baseline scenario and our initial strategy of propagating the OpenTelemetry trace context in AWS Kinesis by using the PartitionKey parameter.

Baseline exploration

To begin, we'll examine the automated instrumentation and trace context propagation provided by the OpenTelemetry SDK, focusing on its default capabilities. Our test involves two interconnected services within AWS Kinesis: a producer, which generates and sends events, and a consumer, which receives and processes them. This initial scenario allows us to observe the natural behavior of trace propagation between the services without any alterations to the system.  

I kept the test scenario and configuration simple to focus on the main problem:

  • We have an AWS Kinesis stream with 2 shards
  • We have an app named event-producer  
  • The app produces events and sends them to the AWS Kinesis stream by AWS SDK.
  • Events are partitioned across shards by event group ID, so events under the same group are put into the same shard and processed sequentially.
  • The event-producer app is automatically instrumented and traced by OTEL Java agent (version 1.30.1).
  • We have an app named event-consumer
  • The app is an AWS Lambda function (otel-lambda-playground-kinesis-handler) and is triggered from the AWS Kinesis stream.
  • The batch size for the trigger was set to 1 to prevent multiple upstream trace link cases (the events are processed together here as a batch in the same invocation, but each event is put to the stream at different downstream traces in the event-producer application), which is the subject of another blog post. [Insert link]  
  • The event-consumer AWS Lambda function handler is wrapped and so automatically traced by OTEL Java AWS Lambda packages (opentelemetry-aws-lambda-core-1.0 and opentelemetry-aws-lambda-events-2.2)

When we run the test scenario, both the event-producer and event-consumer applications are automatically traced by OTEL SDK, but although they belong to the same flow, two different traces are created, independent of each other.

Image 1 - Trace of the event-producer application
Image 2 - Trace of the event-consumer application

1st Attempt - Propagate Trace Context Through “PartitionKey”

When we look at AWS Kinesis' Put Record request model, we see there are 6 parameters:

  • Data
  • ExplicitHashKey
  • PartitionKey
  • SequenceNumberForOrdering
  • StreamARN
  • StreamName  

However, we cannot use or change many parameters here to propagate the traceparent header:

  • We should not change Data among these parameters, because we have mentioned in previous sections what kind of problems may occur if we change this request body.
  • We cannot change parameters StreamName and StreamARN too, because these parameters specify which AWS Kinesis stream we will send the event to.
  • We cannot change the SequenceNumberForOrdering parameter either, because this time the order of the events we send from the same client to the same shard changes, and if we cannot give a monotonic increasing sequence number here, some records may be ignored without being processed. You can look at this question on AWS re:Post, which explains how this situation might happen with a simple example: https://repost.aws/questions/QUerTjydobQ3OL4gB3xQrREA/understanding-kinesis-sequence-numbers

Therefore, in our first attempt, we will use the PartitionKey parameter to propagate the traceparent header.  

For this, we set the traceparent header to the PartitionKey parameter in W3C context header format manually, as in the sample code block below:

private PutRecordRequest injectTraceHeader(PutRecordRequest request){ 
    if (!TRACE_CONTEXT_PROPAGATION_ENABLED) { 
        return request; 
    } 
    Span currentSpan = Span.current(); 
    if (currentSpan == null) { 
        return request; 
    } 
    SpanContext currentSpanContext = currentSpan.getSpanContext(); 
    if (currentSpanContext == null) { 
        return request; 
    } 

    PutRecordRequest.Builder requestBuilder = request.toBuilder(); 
    String traceParent = String.format("00-%s-%s-%s", 
            currentSpanContext.getTraceId(), 
            currentSpanContext.getSpanId(), 
            currentSpanContext.getTraceFlags().asHex()); 
    requestBuilder.partitionKey(traceParent);  

    return requestBuilder.build(); 
} 

When we do this, we can pass the traceparent header to the event-consumer AWS Lambda function within the Kinesis event via the PartitionKey parameter. But since the method we follow here is not a standard way, we need to manually extract the traceparent header from this PartitionKey parameter on the event-consumer function side and give it to the trace context. So that the trace on the event-consumer side can use the propagated trace id here and then both the event-producer and the event-consumer applications will be seen in the same trace.

... 


public class KinesisHandler extends TracingRequestHandler {
    private static final String TRACE_PARENT = "traceparent"; 
    private static final String TRACE_PARENT_PREFIX = "00-"; 
    
    ... 

    @Override 
    protected Void doHandleRequest(KinesisEvent event, Context context) {
    		... 
        return null; 
    } 

    @Override 
    protected Map extractHttpHeaders(KinesisEvent event) { 
        List records = event.getRecords(); 
        if (!records.isEmpty()) { 
            Map headers = new HashMap<>(); 
            KinesisEvent.KinesisEventRecord record = records.get(0); 
            String partitionKey = record.getKinesis().getPartitionKey(); 
            if (partitionKey != null  
                    && partitionKey.startsWith(TRACE_PARENT_PREFIX)) { 
                headers.put(TRACE_PARENT, partitionKey); 
            } 
            if (!headers.isEmpty()) { 
                return headers; 
            } 
        } 
        return super.extractHttpHeaders(event); 
    } 
} 

After making these changes and then running our test scenario again, we can see, as expected, that the event-producer and event-consumer applications, which were in separate traces in the previous test, are now included in the same trace because the trace ID is propagated via the PartitionKey parameter in the PutRecord request.

Image 3 - Trace of the event-producer and event-consumer applications with the same trace ID (broker partitioning)

We were able to propagate the trace ID with this method, but this approach had the following side effect. Since we set the PartitionKey parameter with the traceparent header, the events are now randomly distributed to the shards according to the trace parent header rather than the event group id. This changes the business logic in the application. In other words, if there is a logic on the event consumer side that requires events belonging to the same group to be put into the same shard and be processed sequentially rather than concurrently, this logic is broken now.  

Image 4 - event-consumer function invocation logs for processing of the 1st event with group ID group-1  


Image 5 - event-consumer function invocation logs for processing of the 2nd event with group id group-1

As seen from the AWS CloudWatch logs of the event-consumer function invocations, events with the same group ID are partitioned into different shards because while injecting the transparent header, we replaced the PartitionKey parameter with the transparent header value and so events are partitioned randomly across Kinesis shards based on generated trace ID (and span ID). and this leads to concurrent processing of events in the same group (having the same group ID). Because each event is assigned to different shards and each shard is processed concurrently by different AWS Lambda function microVM instances. As a result, this behavior breaks our business logic in the event-consumer function because we expect that events in the same group are processed sequentially by checking previously processed events in the same group ID and parallel processing of events in the same group might result in race conditions.

Part 2 Conclusion

Our exploration has revealed that while the initial strategy of using the PartitionKey for trace context propagation seemed promising, it inadvertently disrupted the event sequencing logic critical to our application's business requirements. This underscores a pivotal challenge in distributed tracing: the need to balance traceability with the functional integrity of our systems.

In our next and final post in this series, we will address this delicate balance by exploring alternative solutions that aim to preserve both the order of events and the clarity of tracing.  

In the first article of our series, we explored the importance of trace headers and the complexities involved in their propagation. Now, we shift from theory to practice.

This second installment will take you through a hands-on baseline scenario and our initial strategy of propagating the OpenTelemetry trace context in AWS Kinesis by using the PartitionKey parameter.

Baseline exploration

To begin, we'll examine the automated instrumentation and trace context propagation provided by the OpenTelemetry SDK, focusing on its default capabilities. Our test involves two interconnected services within AWS Kinesis: a producer, which generates and sends events, and a consumer, which receives and processes them. This initial scenario allows us to observe the natural behavior of trace propagation between the services without any alterations to the system.  

I kept the test scenario and configuration simple to focus on the main problem:

  • We have an AWS Kinesis stream with 2 shards
  • We have an app named event-producer  
  • The app produces events and sends them to the AWS Kinesis stream by AWS SDK.
  • Events are partitioned across shards by event group ID, so events under the same group are put into the same shard and processed sequentially.
  • The event-producer app is automatically instrumented and traced by OTEL Java agent (version 1.30.1).
  • We have an app named event-consumer
  • The app is an AWS Lambda function (otel-lambda-playground-kinesis-handler) and is triggered from the AWS Kinesis stream.
  • The batch size for the trigger was set to 1 to prevent multiple upstream trace link cases (the events are processed together here as a batch in the same invocation, but each event is put to the stream at different downstream traces in the event-producer application), which is the subject of another blog post. [Insert link]  
  • The event-consumer AWS Lambda function handler is wrapped and so automatically traced by OTEL Java AWS Lambda packages (opentelemetry-aws-lambda-core-1.0 and opentelemetry-aws-lambda-events-2.2)

When we run the test scenario, both the event-producer and event-consumer applications are automatically traced by OTEL SDK, but although they belong to the same flow, two different traces are created, independent of each other.

Image 1 - Trace of the event-producer application
Image 2 - Trace of the event-consumer application

1st Attempt - Propagate Trace Context Through “PartitionKey”

When we look at AWS Kinesis' Put Record request model, we see there are 6 parameters:

  • Data
  • ExplicitHashKey
  • PartitionKey
  • SequenceNumberForOrdering
  • StreamARN
  • StreamName  

However, we cannot use or change many parameters here to propagate the traceparent header:

  • We should not change Data among these parameters, because we have mentioned in previous sections what kind of problems may occur if we change this request body.
  • We cannot change parameters StreamName and StreamARN too, because these parameters specify which AWS Kinesis stream we will send the event to.
  • We cannot change the SequenceNumberForOrdering parameter either, because this time the order of the events we send from the same client to the same shard changes, and if we cannot give a monotonic increasing sequence number here, some records may be ignored without being processed. You can look at this question on AWS re:Post, which explains how this situation might happen with a simple example: https://repost.aws/questions/QUerTjydobQ3OL4gB3xQrREA/understanding-kinesis-sequence-numbers

Therefore, in our first attempt, we will use the PartitionKey parameter to propagate the traceparent header.  

For this, we set the traceparent header to the PartitionKey parameter in W3C context header format manually, as in the sample code block below:

private PutRecordRequest injectTraceHeader(PutRecordRequest request){ 
    if (!TRACE_CONTEXT_PROPAGATION_ENABLED) { 
        return request; 
    } 
    Span currentSpan = Span.current(); 
    if (currentSpan == null) { 
        return request; 
    } 
    SpanContext currentSpanContext = currentSpan.getSpanContext(); 
    if (currentSpanContext == null) { 
        return request; 
    } 

    PutRecordRequest.Builder requestBuilder = request.toBuilder(); 
    String traceParent = String.format("00-%s-%s-%s", 
            currentSpanContext.getTraceId(), 
            currentSpanContext.getSpanId(), 
            currentSpanContext.getTraceFlags().asHex()); 
    requestBuilder.partitionKey(traceParent);  

    return requestBuilder.build(); 
} 

When we do this, we can pass the traceparent header to the event-consumer AWS Lambda function within the Kinesis event via the PartitionKey parameter. But since the method we follow here is not a standard way, we need to manually extract the traceparent header from this PartitionKey parameter on the event-consumer function side and give it to the trace context. So that the trace on the event-consumer side can use the propagated trace id here and then both the event-producer and the event-consumer applications will be seen in the same trace.

... 


public class KinesisHandler extends TracingRequestHandler {
    private static final String TRACE_PARENT = "traceparent"; 
    private static final String TRACE_PARENT_PREFIX = "00-"; 
    
    ... 

    @Override 
    protected Void doHandleRequest(KinesisEvent event, Context context) {
    		... 
        return null; 
    } 

    @Override 
    protected Map extractHttpHeaders(KinesisEvent event) { 
        List records = event.getRecords(); 
        if (!records.isEmpty()) { 
            Map headers = new HashMap<>(); 
            KinesisEvent.KinesisEventRecord record = records.get(0); 
            String partitionKey = record.getKinesis().getPartitionKey(); 
            if (partitionKey != null  
                    && partitionKey.startsWith(TRACE_PARENT_PREFIX)) { 
                headers.put(TRACE_PARENT, partitionKey); 
            } 
            if (!headers.isEmpty()) { 
                return headers; 
            } 
        } 
        return super.extractHttpHeaders(event); 
    } 
} 

After making these changes and then running our test scenario again, we can see, as expected, that the event-producer and event-consumer applications, which were in separate traces in the previous test, are now included in the same trace because the trace ID is propagated via the PartitionKey parameter in the PutRecord request.

Image 3 - Trace of the event-producer and event-consumer applications with the same trace ID (broker partitioning)

We were able to propagate the trace ID with this method, but this approach had the following side effect. Since we set the PartitionKey parameter with the traceparent header, the events are now randomly distributed to the shards according to the trace parent header rather than the event group id. This changes the business logic in the application. In other words, if there is a logic on the event consumer side that requires events belonging to the same group to be put into the same shard and be processed sequentially rather than concurrently, this logic is broken now.  

Image 4 - event-consumer function invocation logs for processing of the 1st event with group ID group-1  


Image 5 - event-consumer function invocation logs for processing of the 2nd event with group id group-1

As seen from the AWS CloudWatch logs of the event-consumer function invocations, events with the same group ID are partitioned into different shards because while injecting the transparent header, we replaced the PartitionKey parameter with the transparent header value and so events are partitioned randomly across Kinesis shards based on generated trace ID (and span ID). and this leads to concurrent processing of events in the same group (having the same group ID). Because each event is assigned to different shards and each shard is processed concurrently by different AWS Lambda function microVM instances. As a result, this behavior breaks our business logic in the event-consumer function because we expect that events in the same group are processed sequentially by checking previously processed events in the same group ID and parallel processing of events in the same group might result in race conditions.

Part 2 Conclusion

Our exploration has revealed that while the initial strategy of using the PartitionKey for trace context propagation seemed promising, it inadvertently disrupted the event sequencing logic critical to our application's business requirements. This underscores a pivotal challenge in distributed tracing: the need to balance traceability with the functional integrity of our systems.

In our next and final post in this series, we will address this delicate balance by exploring alternative solutions that aim to preserve both the order of events and the clarity of tracing.  

This is some text inside of a div block.

You might also like

Blog post

Catchpoint Expands Observability Network to Barcelona: A Growing Internet Hub

Blog post

Demystifying API Monitoring and Testing with IPM

Blog post

The Need for Speed: Highlights from IBM and Catchpoint’s Global DNS Performance Study