Sanitize a dictionary of attributes to be a valid OpenTelemetry attributes. Args: attributes: A dictionary of attributes to sanitize. force: Whether to force sanitization even when the value is not JSON serializable.
(attributes: Dict[str, Any], force: bool = True)
| 479 | |
| 480 | |
| 481 | def sanitize_attributes(attributes: Dict[str, Any], force: bool = True) -> Attributes: |
| 482 | """Sanitize a dictionary of attributes to be a valid OpenTelemetry attributes. |
| 483 | |
| 484 | Args: |
| 485 | attributes: A dictionary of attributes to sanitize. |
| 486 | force: Whether to force sanitization even when the value is not JSON serializable. |
| 487 | """ |
| 488 | result: Attributes = {} |
| 489 | for k, v in attributes.items(): |
| 490 | try: |
| 491 | result[k] = sanitize_attribute_value(v, force=force) |
| 492 | except ValueError as exc: |
| 493 | raise ValueError(f"Failed to sanitize attribute '{k}': {exc}") from exc |
| 494 | return result |
| 495 | |
| 496 | |
| 497 | def sanitize_list_attribute_sanity(maybe_list: List[Any]) -> AttributeValue: |