MCPcopy
hub / github.com/pathwaycom/pathway / _convert_from_json

Function _convert_from_json

python/pathway/stdlib/utils/col.py:132–180  ·  view source on GitHub ↗
(name: str, col: pw.ColumnExpression)

Source from the content-addressed store, hash-verified

130 typehints = schema._dtypes()
131
132 def _convert_from_json(name: str, col: pw.ColumnExpression):
133 _type = dt.unoptionalize(typehints[name])
134 is_optional = isinstance(typehints[name], dt.Optional)
135 result: pw.ColumnExpression
136
137 def _optional(
138 col: pw.ColumnExpression,
139 op: Callable[[pw.ColumnExpression], pw.ColumnExpression],
140 ) -> pw.ColumnExpression:
141 if is_optional:
142 return pw.if_else(col == pw.Json.NULL, None, op(col))
143 else:
144 return op(col)
145
146 match _type:
147 case dt.JSON:
148 result = col
149 case dt.BOOL:
150 result = col.as_bool()
151 case dt.FLOAT:
152 result = col.as_float()
153 case dt.INT:
154 result = col.as_int()
155 case dt.STR:
156 result = col.as_str()
157 case dt.DATE_TIME_NAIVE:
158 result = _optional(
159 col,
160 lambda col: pw.unwrap(col.as_str()).dt.strptime(
161 "%Y-%m-%dT%H:%M:%S.%f"
162 ),
163 )
164 case dt.DATE_TIME_UTC:
165 result = _optional(
166 col,
167 lambda col: pw.unwrap(col.as_str()).dt.strptime(
168 "%Y-%m-%dT%H:%M:%S.%f%z"
169 ),
170 )
171 case dt.DURATION:
172 result = _optional(
173 col, lambda col: pw.unwrap(col.as_int()).dt.to_duration("ns")
174 )
175 case _:
176 raise TypeError(
177 f"Unsupported conversion from pw.Json to {typehints[name]}"
178 )
179
180 return result if is_optional else pw.unwrap(result)
181
182 colrefs = [pw.this[column_name] for column_name in schema.column_names()]
183 kw = {

Callers 1

unpack_col_dictFunction · 0.85

Calls 7

_optionalFunction · 0.85
strptimeMethod · 0.80
to_durationMethod · 0.80
as_boolMethod · 0.45
as_floatMethod · 0.45
as_intMethod · 0.45
as_strMethod · 0.45

Tested by

no test coverage detected