MCPcopy Index your code
hub / github.com/piccolo-orm/piccolo / run

Method run

piccolo/query/methods/objects.py:197–229  ·  view source on GitHub ↗
(
        self,
        node: Optional[str] = None,
        in_pool: bool = True,
    )

Source from the content-addressed store, hash-verified

195 self.values = values
196
197 async def run(
198 self,
199 node: Optional[str] = None,
200 in_pool: bool = True,
201 ) -> None:
202 if not self.row._exists_in_db:
203 raise ValueError("This row doesn't exist in the database.")
204
205 TableClass = self.row.__class__
206
207 primary_key = TableClass._meta.primary_key
208 primary_key_value = getattr(self.row, primary_key._meta.name)
209
210 if primary_key_value is None:
211 raise ValueError("The primary key is None")
212
213 columns = [
214 TableClass._meta.get_column_by_name(i) if isinstance(i, str) else i
215 for i in self.values.keys()
216 ]
217
218 response = (
219 await TableClass.update(self.values)
220 .where(primary_key == primary_key_value)
221 .returning(*columns)
222 .run(
223 node=node,
224 in_pool=in_pool,
225 )
226 )
227
228 for key, value in response[0].items():
229 setattr(self.row, key, value)
230
231 def __await__(self) -> Generator[None, None, None]:
232 """

Callers 2

__await__Method · 0.95
run_syncMethod · 0.95

Calls 5

get_column_by_nameMethod · 0.80
updateMethod · 0.80
runMethod · 0.45
returningMethod · 0.45
whereMethod · 0.45

Tested by

no test coverage detected