MCPcopy
hub / github.com/hacs/integration / test_update_entity_state

Function test_update_entity_state

tests/test_update.py:27–96  ·  view source on GitHub ↗
(
    time_freezer: FrozenDateTimeFactory,
    hass: HomeAssistant,
    response_mocker: ResponseMocker,
    setup_integration: Generator,
    category_test_data: CategoryTestData,
    snapshots: SnapshotFixture,
)

Source from the content-addressed store, hash-verified

25
26@pytest.mark.parametrize("category_test_data", category_test_data_parametrized())
27async def test_update_entity_state(
28 time_freezer: FrozenDateTimeFactory,
29 hass: HomeAssistant,
30 response_mocker: ResponseMocker,
31 setup_integration: Generator,
32 category_test_data: CategoryTestData,
33 snapshots: SnapshotFixture,
34):
35 hacs = get_hacs(hass)
36 repo = hacs.repositories.get_by_full_name(category_test_data["repository"])
37
38 assert repo is not None
39
40 repo.data.installed = True
41 repo.data.installed_version = category_test_data["version_base"]
42
43 await hass.config_entries.async_reload(hacs.configuration.config_entry.entry_id)
44 await hass.async_block_till_done()
45
46 # Get a new HACS instance after reload
47 hacs = get_hacs(hass)
48
49 er = async_get_entity_registry(hacs.hass)
50
51 entity_id = er.async_get_entity_id("update", DOMAIN, repo.data.id)
52
53 # Get initial state
54 state = hass.states.get(entity_id)
55 initial_state = recursive_remove_key(
56 state.as_dict(), ("id", "last_changed", "last_reported", "last_updated",
57 "display_precision", "update_percentage"),
58 )
59
60 # Bump version
61 fixture_file = f"fixtures/proxy/data-v2.hacs.xyz/{
62 category_test_data['category']}/data.json"
63 fp = os.path.join(
64 os.path.dirname(__file__),
65 fixture_file,
66 )
67 with open(fp, encoding="utf-8") as fptr:
68 data = json_func.loads(fptr.read())
69
70 for repo_data in data.values():
71 repo_data["last_fetched"] += 1
72 repo_data["last_version"] = "2.0.0"
73
74 response_mocker.add(
75 f"https://data-v2.hacs.xyz/{category_test_data['category']}/data.json",
76 MockedResponse(content=data),
77 )
78
79 repo = hacs.repositories.get_by_full_name(category_test_data["repository"])
80
81 time_freezer.tick(3600 * 48 + 1)
82 await hass.async_block_till_done()
83 await hass.async_block_till_done()
84

Callers

nothing calls this directly

Calls 8

get_hacsFunction · 0.90
recursive_remove_keyFunction · 0.90
MockedResponseClass · 0.90
safe_json_dumpsFunction · 0.90
get_by_full_nameMethod · 0.80
getMethod · 0.80
readMethod · 0.45
addMethod · 0.45

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…