From 03edb9bb01a9eafef8cb17c8f76bad982605d97a Mon Sep 17 00:00:00 2001 From: Cagri Yonca Date: Thu, 25 Jun 2026 11:05:05 +0200 Subject: [PATCH] fix: Add asyncio task context propagation to env vars Signed-off-by: Cagri Yonca --- src/instana/options.py | 141 ++++++++++++++++++++++------------------- tests/test_options.py | 17 +++++ 2 files changed, 92 insertions(+), 66 deletions(-) diff --git a/src/instana/options.py b/src/instana/options.py index 480f68fa..d8b046e7 100644 --- a/src/instana/options.py +++ b/src/instana/options.py @@ -14,9 +14,12 @@ - GCROptions - Options class for Google cloud Run. Holds settings specific to GCR. """ +from __future__ import annotations + import logging import os -from typing import Any, Dict, Sequence, Tuple +from collections.abc import Sequence +from typing import Any from instana.configurator import config from instana.log import logger @@ -41,7 +44,7 @@ class BaseOptions(object): """Base class for all option classes. Holds items common to all""" - def __init__(self, **kwds: Dict[str, Any]) -> None: + def __init__(self, **kwds: object) -> None: self.debug = False self.log_level = logging.WARN self.service_name = determine_service_name() @@ -115,6 +118,11 @@ def set_trace_configurations(self) -> None: "trace_correlation", True ) + if "INSTANA_ASYNCIO_TASK_CONTEXT_PROPAGATION" in os.environ: + config["asyncio_task_context_propagation"]["enabled"] = is_truthy( + os.environ["INSTANA_ASYNCIO_TASK_CONTEXT_PROPAGATION"] + ) + self.set_disable_trace_configurations() self.set_stack_trace_configurations() self.set_span_filter_configurations() @@ -319,7 +327,7 @@ def is_span_disabled(self, category=None, span_type=None) -> bool: # Default: not disabled return False - def get_stack_trace_config(self, span_name: str) -> Tuple[str, int]: + def get_stack_trace_config(self, span_name: str) -> tuple[str, int]: """ Get stack trace configuration for a specific span type. Technology-specific configuration overrides global configuration. @@ -357,7 +365,7 @@ class StandardOptions(BaseOptions): DEFAULT_POLL_RATE = 1 MAX_POLL_RATE = 5 - def __init__(self, **kwds: Dict[str, Any]) -> None: + def __init__(self, **kwds: object) -> None: super(StandardOptions, self).__init__() self.agent_host = os.environ.get("INSTANA_AGENT_HOST", self.AGENT_DEFAULT_HOST) @@ -367,7 +375,7 @@ def __init__(self, **kwds: Dict[str, Any]) -> None: if not isinstance(self.agent_port, int): self.agent_port = int(self.agent_port) - def set_secrets(self, secrets: Dict[str, Any]) -> None: + def set_secrets(self, secrets: dict[str, str | list[str]]) -> None: """ Set the secret option from the agent config. @param secrets: dictionary of secrets @@ -376,7 +384,7 @@ def set_secrets(self, secrets: Dict[str, Any]) -> None: self.secrets_matcher = secrets["matcher"] self.secrets_list = secrets["list"] - def set_extra_headers(self, extra_headers: Dict[str, Any]) -> None: + def set_extra_headers(self, extra_headers: list[str]) -> None: """ Set the extra headers option from the agent config, which uses the legacy configuration setting. @param extra_headers: dictionary of headers @@ -390,41 +398,17 @@ def set_extra_headers(self, extra_headers: Dict[str, Any]) -> None: f"Will also capture these custom headers: {self.extra_http_headers}" ) - def set_tracing(self, tracing: Dict[str, Any]) -> None: + def set_tracing(self, tracing: dict[str, Any]) -> None: """ Set tracing options from the agent config. @param tracing: tracing configuration dictionary @return: None """ if "filter" in tracing and not self._has_high_priority_span_filter_source(): - parsed = parse_filter_rules(tracing["filter"]) - for policy in ("exclude", "include"): - rules = parsed.get(policy, []) - if rules: - if policy not in self.span_filters: - self.span_filters[policy] = [] - self.span_filters[policy].extend(rules) + self._apply_agent_filter_config(tracing["filter"]) if "kafka" in tracing: - if ( - "INSTANA_KAFKA_TRACE_CORRELATION" not in os.environ - and not ( - isinstance(config.get("tracing"), dict) - and "kafka" in config["tracing"] - ) - and "trace-correlation" in tracing["kafka"] - ): - self.kafka_trace_correlation = is_truthy( - tracing["kafka"].get("trace-correlation", True) - ) - - if ( - "header-format" in tracing["kafka"] - and tracing["kafka"]["header-format"] == "binary" - ): - logger.warning( - "Binary header format for Kafka is deprecated. Please use string header format." - ) + self._apply_agent_kafka_config(tracing["kafka"]) if "extra-http-headers" in tracing: self.extra_http_headers = tracing["extra-http-headers"] @@ -436,6 +420,32 @@ def set_tracing(self, tracing: Dict[str, Any]) -> None: # Handle stack trace configuration from agent config self.set_stack_trace_from_agent(tracing) + def _apply_agent_filter_config(self, filter_config: dict[str, Any]) -> None: + """Apply span filter rules from agent config.""" + parsed = parse_filter_rules(filter_config) + for policy in ("exclude", "include"): + rules = parsed.get(policy, []) + if rules: + if policy not in self.span_filters: + self.span_filters[policy] = [] + self.span_filters[policy].extend(rules) + + def _apply_agent_kafka_config(self, kafka_config: dict[str, str | bool]) -> None: + """Apply Kafka tracing configuration from agent config.""" + no_env_override = "INSTANA_KAFKA_TRACE_CORRELATION" not in os.environ + no_code_override = not ( + isinstance(config.get("tracing"), dict) and "kafka" in config["tracing"] + ) + if no_env_override and no_code_override and "trace-correlation" in kafka_config: + self.kafka_trace_correlation = is_truthy( + kafka_config.get("trace-correlation", True) + ) + + if kafka_config.get("header-format") == "binary": + logger.warning( + "Binary header format for Kafka is deprecated. Please use string header format." + ) + def _has_high_priority_span_filter_source(self) -> bool: """Return True if a higher-priority span filter source (env var, YAML, or in-code config) has already been configured, in which case the agent-provided filter should be ignored.""" @@ -469,7 +479,7 @@ def _should_apply_agent_global_config(self) -> bool: return not (has_env_vars or has_yaml_config or has_in_code_config) def _apply_agent_global_stack_trace_config( - self, global_config: Dict[str, Any] + self, global_config: dict[str, Any] ) -> None: """Apply global stack trace configuration from agent config.""" if "stack-trace" in global_config and ( @@ -486,7 +496,7 @@ def _apply_agent_global_stack_trace_config( ): self.stack_trace_length = validated_length - def _apply_agent_tech_stack_trace_config(self, tracing: Dict[str, Any]) -> None: + def _apply_agent_tech_stack_trace_config(self, tracing: dict[str, Any]) -> None: """Apply technology-specific stack trace configuration from agent config.""" for tech_name, tech_config in tracing.items(): if tech_name == "global" or not isinstance(tech_config, dict): @@ -502,7 +512,7 @@ def _apply_agent_tech_stack_trace_config(self, tracing: Dict[str, Any]) -> None: if tech_stack_config: self.stack_trace_technology_config[tech_name] = tech_stack_config - def set_stack_trace_from_agent(self, tracing: Dict[str, Any]) -> None: + def set_stack_trace_from_agent(self, tracing: dict[str, Any]) -> None: """ Set stack trace configuration from agent config (configuration.yaml). Only applies if not already set by higher priority sources. @@ -517,7 +527,7 @@ def set_stack_trace_from_agent(self, tracing: Dict[str, Any]) -> None: if not self.stack_trace_technology_config: self._apply_agent_tech_stack_trace_config(tracing) - def set_disable_tracing(self, tracing_config: Sequence[Dict[str, Any]]) -> None: + def set_disable_tracing(self, tracing_config: Sequence[dict[str, Any]]) -> None: # The precedence is as follows: # environment variables > in-code (local) config > agent config (configuration.yaml) if ( @@ -533,7 +543,7 @@ def set_disable_tracing(self, tracing_config: Sequence[Dict[str, Any]]) -> None: self.disabled_spans.extend(disabled_spans) self.enabled_spans.extend(enabled_spans) - def set_poll_rate(self, plugin_config: Dict[str, Any]) -> None: + def set_poll_rate(self, plugin_config: dict[str, Any]) -> None: """Set poll rate from agent plugin configuration.""" poll_rate_value = plugin_config.get("poll_rate") if poll_rate_value is None: @@ -561,7 +571,7 @@ def set_poll_rate(self, plugin_config: Dict[str, Any]) -> None: ) self.poll_rate = self.DEFAULT_POLL_RATE - def set_from(self, res_data: Dict[str, Any]) -> None: + def set_from(self, res_data: dict[str, Any]) -> None: """ Set the source identifiers given to use by the Instana Host agent. @param res_data: source identifiers provided as announce response @@ -591,7 +601,7 @@ def set_from(self, res_data: Dict[str, Any]) -> None: class ServerlessOptions(BaseOptions): """Base class for serverless environments. Holds settings common to all serverless environments.""" - def __init__(self, **kwds: Dict[str, Any]) -> None: + def __init__(self, **kwds: object) -> None: super(ServerlessOptions, self).__init__() self.agent_key = os.environ.get("INSTANA_AGENT_KEY", None) @@ -601,16 +611,10 @@ def __init__(self, **kwds: Dict[str, Any]) -> None: if self.endpoint_url is not None and self.endpoint_url[-1] == "/": self.endpoint_url = self.endpoint_url[:-1] - if "INSTANA_DISABLE_CA_CHECK" in os.environ: - self.ssl_verify = False - else: - self.ssl_verify = True + self.ssl_verify = "INSTANA_DISABLE_CA_CHECK" not in os.environ proxy = os.environ.get("INSTANA_ENDPOINT_PROXY", None) - if proxy is None: - self.endpoint_proxy = {} - else: - self.endpoint_proxy = {"https": proxy} + self.endpoint_proxy = {"https": proxy} if proxy else {} timeout_in_ms = os.environ.get("INSTANA_TIMEOUT", None) if timeout_in_ms is None: @@ -631,33 +635,38 @@ def __init__(self, **kwds: Dict[str, Any]) -> None: value = os.environ.get("INSTANA_LOG_LEVEL", None) if value is not None: - try: - value = value.lower() - if value == "debug": - self.log_level = logging.DEBUG - elif value == "info": - self.log_level = logging.INFO - elif value == "warn" or value == "warning": - self.log_level = logging.WARNING - elif value == "error": - self.log_level = logging.ERROR - else: - logger.warning(f"Unknown INSTANA_LOG_LEVEL specified: {value}") - except Exception: - logger.debug("BaseAgent.update_log_level: ", exc_info=True) + self._apply_log_level(value) + + def _apply_log_level(self, value: str) -> None: + """Set log_level from a raw INSTANA_LOG_LEVEL string.""" + _LOG_LEVELS = { + "debug": logging.DEBUG, + "info": logging.INFO, + "warn": logging.WARNING, + "warning": logging.WARNING, + "error": logging.ERROR, + } + try: + level = _LOG_LEVELS.get(value.lower()) + if level is not None: + self.log_level = level + else: + logger.warning(f"Unknown INSTANA_LOG_LEVEL specified: {value}") + except Exception: + logger.debug("BaseAgent.update_log_level: ", exc_info=True) class AWSLambdaOptions(ServerlessOptions): """Options class for AWS Lambda. Holds settings specific to AWS Lambda.""" - def __init__(self, **kwds: Dict[str, Any]) -> None: + def __init__(self, **kwds: object) -> None: super(AWSLambdaOptions, self).__init__() class AWSFargateOptions(ServerlessOptions): """Options class for AWS Fargate. Holds settings specific to AWS Fargate.""" - def __init__(self, **kwds: Dict[str, Any]) -> None: + def __init__(self, **kwds: object) -> None: super(AWSFargateOptions, self).__init__() self.tags = None @@ -682,12 +691,12 @@ def __init__(self, **kwds: Dict[str, Any]) -> None: class EKSFargateOptions(AWSFargateOptions): """Options class for EKS Pods on AWS Fargate. Holds settings specific to EKS Pods on AWS Fargate.""" - def __init__(self, **kwds: Dict[str, Any]) -> None: + def __init__(self, **kwds: object) -> None: super(EKSFargateOptions, self).__init__() class GCROptions(ServerlessOptions): """Options class for Google Cloud Run. Holds settings specific to Google Cloud Run.""" - def __init__(self, **kwds: Dict[str, Any]) -> None: + def __init__(self, **kwds: object) -> None: super(GCROptions, self).__init__() diff --git a/tests/test_options.py b/tests/test_options.py index 3beeecf6..02be426a 100644 --- a/tests/test_options.py +++ b/tests/test_options.py @@ -864,6 +864,23 @@ def test_tracing_filter_environment_variables(self) -> None: ], } + def test_asyncio_task_context_propagation_default(self) -> None: + """INSTANA_ASYNCIO_TASK_CONTEXT_PROPAGATION is False by default.""" + self.base_options = BaseOptions() + assert config["asyncio_task_context_propagation"]["enabled"] is False + + @patch.dict(os.environ, {"INSTANA_ASYNCIO_TASK_CONTEXT_PROPAGATION": "true"}) + def test_asyncio_task_context_propagation_enabled_via_env(self) -> None: + """INSTANA_ASYNCIO_TASK_CONTEXT_PROPAGATION=true enables the flag.""" + self.base_options = BaseOptions() + assert config["asyncio_task_context_propagation"]["enabled"] is True + + @patch.dict(os.environ, {"INSTANA_ASYNCIO_TASK_CONTEXT_PROPAGATION": "false"}) + def test_asyncio_task_context_propagation_disabled_via_env(self) -> None: + """INSTANA_ASYNCIO_TASK_CONTEXT_PROPAGATION=false keeps the flag disabled.""" + self.base_options = BaseOptions() + assert config["asyncio_task_context_propagation"]["enabled"] is False + class TestStandardOptions: @pytest.fixture(autouse=True)