MCPcopy Index your code
hub / github.com/diffgram/diffgram / connection_url_regenerate

Function connection_url_regenerate

shared/url_generation.py:217–275  ·  view source on GitHub ↗
(session: Session,
                              blob_object: DiffgramBlobObjectType,
                              connection_id: int,
                              bucket_name: str,
                              new_offset_in_seconds: int,
                              access_token: str = None,
                              reference_file: File = None)

Source from the content-addressed store, hash-verified

215
216
217def connection_url_regenerate(session: Session,
218 blob_object: DiffgramBlobObjectType,
219 connection_id: int,
220 bucket_name: str,
221 new_offset_in_seconds: int,
222 access_token: str = None,
223 reference_file: File = None) -> list[DiffgramBlobObjectType, dict]:
224
225 log = regular_log.default()
226 member = get_member(session = session)
227
228 params = {
229 'bucket_name': bucket_name,
230 'path': blob_object.url_signed_blob_path if reference_file is None else reference_file.get_blob_path(),
231 'expiration_offset': new_offset_in_seconds,
232 'access_token': access_token,
233 'action_type': 'get_pre_signed_url',
234 'event_data': {
235 'request_user': member.user_id
236 }
237 }
238 client, log = get_custom_url_supported_connector(
239 session = session,
240 log = log,
241 connection_id = connection_id,
242 )
243 if regular_log.log_has_error(log):
244 return blob_object, log
245
246 result, log = get_from_connector(
247 connector = client,
248 params = params,
249 log = log)
250
251 if regular_log.log_has_error(log):
252 return blob_object, log
253
254 blob_object.url_signed = result.get('signed_url')
255 blob_object.url_signed_thumb = result.get('signed_url')
256 error = result.get('error')
257 if error:
258 try:
259 blob_object.error = result.get('error')
260 except:
261 blob_object.error = str(result.get('error'))
262 # Extra assets (Depending on type)
263
264 if type(blob_object) == TextFile and blob_object.tokens_url_signed_blob_path:
265 blob_object, log = generate_text_token_url(
266 session = session,
267 blob_object = blob_object,
268 params = params,
269 log = log,
270 client = client,
271 )
272 session.add(blob_object) # TODO review if needed to add to session here
273 # If it's unique for every user then it's not clear why we would need to do this
274 # And adds processing delays for viewing large amounts at once.

Callers 1

blob_regenerate_urlFunction · 0.85

Calls 7

get_memberFunction · 0.90
get_from_connectorFunction · 0.85
generate_text_token_urlFunction · 0.85
get_blob_pathMethod · 0.80
getMethod · 0.45
addMethod · 0.45

Tested by

no test coverage detected