Execute provided retrieval job and persist its outcome in given storage. Storage type (eg, BigQuery or Redshift) must be the same as globally configured offline store. After data successfully persisted saved dataset object with dataset metadata is committed to the registry.
(
self,
from_: RetrievalJob,
name: str,
storage: SavedDatasetStorage,
tags: Optional[Dict[str, str]] = None,
feature_service: Optional[FeatureService] = None,
allow_overwrite: bool = False,
)
| 1871 | return job |
| 1872 | |
| 1873 | def create_saved_dataset( |
| 1874 | self, |
| 1875 | from_: RetrievalJob, |
| 1876 | name: str, |
| 1877 | storage: SavedDatasetStorage, |
| 1878 | tags: Optional[Dict[str, str]] = None, |
| 1879 | feature_service: Optional[FeatureService] = None, |
| 1880 | allow_overwrite: bool = False, |
| 1881 | ) -> SavedDataset: |
| 1882 | """ |
| 1883 | Execute provided retrieval job and persist its outcome in given storage. |
| 1884 | Storage type (eg, BigQuery or Redshift) must be the same as globally configured offline store. |
| 1885 | After data successfully persisted saved dataset object with dataset metadata is committed to the registry. |
| 1886 | Name for the saved dataset should be unique within project, since it's possible to overwrite previously stored dataset |
| 1887 | with the same name. |
| 1888 | |
| 1889 | Args: |
| 1890 | from_: The retrieval job whose result should be persisted. |
| 1891 | name: The name of the saved dataset. |
| 1892 | storage: The saved dataset storage object indicating where the result should be persisted. |
| 1893 | tags (optional): A dictionary of key-value pairs to store arbitrary metadata. |
| 1894 | feature_service (optional): The feature service that should be associated with this saved dataset. |
| 1895 | allow_overwrite (optional): If True, the persisted result can overwrite an existing table or file. |
| 1896 | |
| 1897 | Returns: |
| 1898 | SavedDataset object with attached RetrievalJob |
| 1899 | |
| 1900 | Raises: |
| 1901 | ValueError if given retrieval job doesn't have metadata |
| 1902 | """ |
| 1903 | if not flags_helper.is_test(): |
| 1904 | warnings.warn( |
| 1905 | "Saving dataset is an experimental feature. " |
| 1906 | "This API is unstable and it could and most probably will be changed in the future. " |
| 1907 | "We do not guarantee that future changes will maintain backward compatibility.", |
| 1908 | RuntimeWarning, |
| 1909 | ) |
| 1910 | |
| 1911 | if not from_.metadata: |
| 1912 | raise ValueError( |
| 1913 | f"The RetrievalJob {type(from_)} must implement the metadata property." |
| 1914 | ) |
| 1915 | |
| 1916 | dataset = SavedDataset( |
| 1917 | name=name, |
| 1918 | features=from_.metadata.features, |
| 1919 | join_keys=from_.metadata.keys, |
| 1920 | full_feature_names=from_.full_feature_names, |
| 1921 | storage=storage, |
| 1922 | tags=tags, |
| 1923 | feature_service_name=feature_service.name if feature_service else None, |
| 1924 | ) |
| 1925 | |
| 1926 | dataset.min_event_timestamp = from_.metadata.min_event_timestamp |
| 1927 | dataset.max_event_timestamp = from_.metadata.max_event_timestamp |
| 1928 | |
| 1929 | from_.persist(storage=storage, allow_overwrite=allow_overwrite) |
| 1930 |