MCPcopy
hub / github.com/diffgram/diffgram / upload_thumbnail_for_connection_image

Function upload_thumbnail_for_connection_image

shared/url_generation.py:87–162  ·  view source on GitHub ↗
(session: Session,
                                          blob_object: DiffgramBlobObjectType,
                                          connection_id: int,
                                          bucket_name: str,
                                          new_offset_in_seconds: int,
                                          member: Member,
                                          access_token: str = None,
                                          reference_file: File = None)

Source from the content-addressed store, hash-verified

85
86
87def upload_thumbnail_for_connection_image(session: Session,
88 blob_object: DiffgramBlobObjectType,
89 connection_id: int,
90 bucket_name: str,
91 new_offset_in_seconds: int,
92 member: Member,
93 access_token: str = None,
94 reference_file: File = None) -> [DiffgramBlobObjectType, dict]:
95 log = regular_log.default()
96 extension = get_blob_file_extension(blob_path = blob_object.url_signed_blob_path)
97 file_dir = get_blob_file_path_without_name(blob_path = blob_object.url_signed_blob_path)
98 file_name = get_blob_file_name(blob_path = blob_object.url_signed_blob_path)
99 blob_path_dirs = get_blob_file_path_without_name(blob_path = blob_object.url_signed_blob_path)
100 blob_path_thumb = f'{blob_path_dirs}thumb/{file_name}'
101 connection = Connection.get_by_id(session = session, id = connection_id)
102 params = {
103 'bucket_name': bucket_name,
104 'path': blob_path_thumb,
105 'expiration_offset': new_offset_in_seconds,
106 'access_token': access_token,
107 'action_type': 'custom_image_upload_url',
108 'event_data': {
109 'request_user': member.user_id
110 }
111 }
112 client, log = get_custom_url_supported_connector(
113 session = session,
114 log = log,
115 connection_id = connection_id,
116 )
117 if regular_log.log_has_error(log):
118 return blob_object, log
119 put_data, log = get_from_connector(connector = client, params = params, log = log)
120 if regular_log.log_has_error(log):
121 if 'blob_exists' in log['error']:
122 log = regular_log.default()
123 blob_object.url_signed_thumb_blob_path = blob_path_thumb
124 session.add(blob_object)
125 return blob_object, log
126 if put_data is None:
127 return blob_object, log
128 url = put_data.get('url')
129 fields = put_data.get('fields')
130 headers = put_data.get('headers')
131 if not url:
132 return blob_object, log
133 # Download Asset and re upload to url
134 temp_dir = tempfile.mkdtemp()
135 temp_dir_path_and_filename = f"{temp_dir}/{file_name}.{extension}"
136 # Get image
137
138 response = requests.get(blob_object.url_signed)
139 if not response.ok:
140 msg = f'Failed to upload thumb. Error getting blob url {blob_object.url_signed_blob_path}'
141 logger.error(msg)
142 log['error']['upload_thumb'] = msg
143 return blob_object, log
144 img_data = response.content

Callers

nothing calls this directly

Calls 8

get_blob_file_extensionFunction · 0.85
get_blob_file_nameFunction · 0.85
get_from_connectorFunction · 0.85
get_by_idMethod · 0.45
addMethod · 0.45
getMethod · 0.45

Tested by

no test coverage detected