MCPcopy
hub / github.com/microsoft/agent-lightning / set_input

Method set_input

agentlightning/emitter/annotation.py:143–171  ·  view source on GitHub ↗

Record input arguments on the current span. Positional arguments are stored under the `input.args. ` attributes, and keyword arguments are stored under `input. ` attributes. This is intended for use inside a `with operation(...) as op` block. Args:

(self, *args: Any, **kwargs: Any)

Source from the content-addressed store, hash-verified

141 return self._span
142
143 def set_input(self, *args: Any, **kwargs: Any) -> None:
144 """Record input arguments on the current span.
145
146 Positional arguments are stored under the `input.args.<index>` attributes,
147 and keyword arguments are stored under `input.<name>` attributes.
148
149 This is intended for use inside a `with operation(...) as op` block.
150
151 Args:
152 *args: Positional arguments to record.
153 **kwargs: Keyword arguments to record.
154 """
155 if not self._recording_context:
156 raise RuntimeError("No recording context found. Cannot set input.")
157
158 prefix = LightningSpanAttributes.OPERATION_INPUT.value
159 attributes: Dict[str, Any] = {}
160 if args:
161 for idx, value in enumerate(args):
162 flattened = flatten_attributes({str(idx): value})
163 for nested_key, nested_value in flattened.items():
164 attributes[f"{prefix}.args.{nested_key}"] = nested_value
165 if kwargs:
166 for key, value in kwargs.items():
167 flattened = flatten_attributes({key: value})
168 for nested_key, nested_value in flattened.items():
169 attributes[f"{prefix}.{nested_key}"] = nested_value
170 if attributes:
171 self._recording_context.record_attributes(sanitize_attributes(attributes))
172
173 def set_output(self, output: Any) -> None:
174 """Record the output value on the current span.

Calls 3

flatten_attributesFunction · 0.90
sanitize_attributesFunction · 0.90
record_attributesMethod · 0.45