(
self, request: RegistryServer_pb2.ApplyDataSourceRequest, context
)
| 243 | return Empty() |
| 244 | |
| 245 | def ApplyDataSource( |
| 246 | self, request: RegistryServer_pb2.ApplyDataSourceRequest, context |
| 247 | ): |
| 248 | data_source = cast( |
| 249 | DataSource, |
| 250 | assert_permissions_to_update( |
| 251 | resource=DataSource.from_proto(request.data_source), |
| 252 | getter=self.proxied_registry.get_data_source, |
| 253 | project=request.project, |
| 254 | ), |
| 255 | ) |
| 256 | self.proxied_registry.apply_data_source( |
| 257 | data_source=data_source, |
| 258 | project=request.project, |
| 259 | commit=request.commit, |
| 260 | ) |
| 261 | |
| 262 | return Empty() |
| 263 | |
| 264 | def GetDataSource(self, request: RegistryServer_pb2.GetDataSourceRequest, context): |
| 265 | return assert_permissions( |
nothing calls this directly
no test coverage detected