Creates a bucket :returns: the name of the bucket created
(session, name=None, region=None)
| 174 | |
| 175 | |
| 176 | def create_bucket(session, name=None, region=None): |
| 177 | """ |
| 178 | Creates a bucket |
| 179 | :returns: the name of the bucket created |
| 180 | """ |
| 181 | if not region: |
| 182 | region = 'us-west-2' |
| 183 | client = session.create_client('s3', region_name=region) |
| 184 | if name: |
| 185 | bucket_name = name |
| 186 | else: |
| 187 | bucket_name = random_bucket_name() |
| 188 | params = {'Bucket': bucket_name, 'ObjectOwnership': 'ObjectWriter'} |
| 189 | if region != 'us-east-1': |
| 190 | params['CreateBucketConfiguration'] = {'LocationConstraint': region} |
| 191 | try: |
| 192 | client.create_bucket(**params) |
| 193 | except ClientError as e: |
| 194 | if e.response['Error'].get('Code') == 'BucketAlreadyOwnedByYou': |
| 195 | # This can happen in the retried request, when the first one |
| 196 | # succeeded on S3 but somehow the response never comes back. |
| 197 | # We still got a bucket ready for test anyway. |
| 198 | pass |
| 199 | else: |
| 200 | raise |
| 201 | client.put_bucket_encryption( |
| 202 | Bucket=bucket_name, |
| 203 | ServerSideEncryptionConfiguration={ |
| 204 | 'Rules': [ |
| 205 | { |
| 206 | 'ApplyServerSideEncryptionByDefault': { |
| 207 | 'SSEAlgorithm': 'AES256', |
| 208 | }, |
| 209 | 'BlockedEncryptionTypes': { |
| 210 | 'EncryptionType': ['NONE'], |
| 211 | }, |
| 212 | } |
| 213 | ], |
| 214 | }, |
| 215 | ) |
| 216 | client.delete_public_access_block(Bucket=bucket_name) |
| 217 | return bucket_name |
| 218 | |
| 219 | |
| 220 | def create_dir_bucket(session, name=None, location=None): |
no test coverage detected