Returns the network information about the user, given the current access token. Will cache results for 2 seconds, to reduce network calls for repetitive hits (like printing the user object).
(self)
| 150 | |
| 151 | |
| 152 | def get_user_network_info (self) -> dict: |
| 153 | """ |
| 154 | Returns the network information about the user, given the current access token. Will cache results for 2 seconds, |
| 155 | to reduce network calls for repetitive hits (like printing the user object). |
| 156 | """ |
| 157 | CACHE_SECONDS = 2 |
| 158 | # if info is malformed, reset |
| 159 | for itm in ['userId', 'subscriptionId', 'restricted', 'quotaExceeded', 'trial', 'last_sync', 'sync_staus']: |
| 160 | if itm not in self.__usrinfo__: |
| 161 | self.__usrinfo__ = { |
| 162 | "userId": None, |
| 163 | "subscriptionId": None, |
| 164 | "restricted": None, |
| 165 | "quotaExceeded": None, |
| 166 | "trial": None, |
| 167 | "last_sync": datetime.datetime.strptime('1970-01-01 00:00:00','%Y-%m-%d %H:%M:%S'), |
| 168 | "sync_staus": None, |
| 169 | "connected_flag": False |
| 170 | } |
| 171 | self.logger.debug(f'get_user_network_info dictionary reset ({self.user_id})') |
| 172 | break |
| 173 | |
| 174 | # if last_sync was less than CACHE_SECONDS ago, return existing data and don't repull |
| 175 | if (self.__usrinfo__['last_sync'] + datetime.timedelta(seconds = CACHE_SECONDS )) > datetime.datetime.now(): |
| 176 | self.logger.debug(f'get_user_network_info request satified by cache ({self.user_id})') |
| 177 | return self.__usrinfo__ |
| 178 | |
| 179 | # if access token is expired, report as disconnected but change nothing |
| 180 | if self.access_expired: |
| 181 | self.__usrinfo__["sync_staus"] = 'disconnected - authenticate to retrieve' |
| 182 | self.__usrinfo__['connected_flag'] = False |
| 183 | self.logger.debug(f'get_user_network_info request aborted due to lack of access token ({self.user_id})') |
| 184 | return self.__usrinfo__ |
| 185 | |
| 186 | # make a call to the network: |
| 187 | success, response = self.base_api.auth_validtoken() |
| 188 | if not success: |
| 189 | self.__usrinfo__["sync_staus"] = 'disconnected - authenticate to retrieve' |
| 190 | self.__usrinfo__['connected_flag'] = False |
| 191 | self.logger.error(f'get_user_network_info request failed due to API call failure ({self.user_id})') |
| 192 | return self.__usrinfo__ |
| 193 | |
| 194 | # loop thru response, validate, and store |
| 195 | issue_found = False |
| 196 | for itm in ['userId', 'subscriptionId', 'restricted', 'quotaExceeded', 'trial']: |
| 197 | if itm in response: self.__usrinfo__[itm] = response[itm] |
| 198 | else: |
| 199 | f'{itm} not returned from API auth/validtoken' |
| 200 | self.__usrinfo__[itm] = None |
| 201 | issue_found = True |
| 202 | self.__usrinfo__['last_sync'] = datetime.datetime.now() |
| 203 | self.__usrinfo__['sync_staus'] = 'synced' if not issue_found else 'synced with issues - missing value from auth/validtoken API' |
| 204 | self.__usrinfo__['connected_flag'] = True |
| 205 | self.logger.debug(f'get_user_network_info request complete and cache updated ({self.user_id})') |
| 206 | if issue_found: self.logger.error(self.__usrinfo__['sync_staus']) |
| 207 | |
| 208 | # set user_id if not set, but do not overwrite |
| 209 | if not self.user_id: self.user_id = self.__usrinfo__['userId'] |
no test coverage detected