()
| 15 | |
| 16 | |
| 17 | def main(): |
| 18 | composio = Composio(provider=OpenAIProvider()) |
| 19 | |
| 20 | # Create a session |
| 21 | print("Creating tool router session...") |
| 22 | session = composio.tool_router.create(user_id="demo_files_user") |
| 23 | print(f" Session ID: {session.session_id}") |
| 24 | |
| 25 | # Upload a file (from bytes) |
| 26 | print("\nUploading file (bytes)...") |
| 27 | remote = session.experimental.files.upload( |
| 28 | b'{"hello": "world"}', |
| 29 | remote_path="test_data.json", |
| 30 | mimetype="application/json", |
| 31 | ) |
| 32 | print(f" Uploaded to: {remote.mount_relative_path}") |
| 33 | |
| 34 | # List files |
| 35 | print("\nListing files...") |
| 36 | result = session.experimental.files.list(path="/") |
| 37 | print(f" Items: {len(result.items)}") |
| 38 | for item in result.items: |
| 39 | print(f" - {item.mount_relative_path} ({item.size} bytes)") |
| 40 | |
| 41 | # Download the file |
| 42 | print("\nDownloading file...") |
| 43 | downloaded = session.experimental.files.download(remote.mount_relative_path) |
| 44 | content = downloaded.text() |
| 45 | print(f" Content: {content[:80]}...") |
| 46 | |
| 47 | # Upload from local file |
| 48 | with tempfile.NamedTemporaryFile(mode="w", suffix=".txt", delete=False) as f: |
| 49 | f.write("Hello from local file") |
| 50 | local_path = f.name |
| 51 | try: |
| 52 | print("\nUploading from local path...") |
| 53 | remote2 = session.experimental.files.upload(local_path) |
| 54 | print(f" Uploaded to: {remote2.mount_relative_path}") |
| 55 | finally: |
| 56 | Path(local_path).unlink(missing_ok=True) |
| 57 | |
| 58 | # List again |
| 59 | print("\nListing files (after 2nd upload)...") |
| 60 | result2 = session.experimental.files.list(path="/") |
| 61 | print(f" Items: {len(result2.items)}") |
| 62 | for item in result2.items: |
| 63 | print(f" - {item.mount_relative_path} ({item.size} bytes)") |
| 64 | |
| 65 | # Delete |
| 66 | print("\nDeleting test files...") |
| 67 | session.experimental.files.delete(remote.mount_relative_path) |
| 68 | session.experimental.files.delete(remote2.mount_relative_path) |
| 69 | print(" Deleted.") |
| 70 | |
| 71 | # Search for tools |
| 72 | print("\nSearching for tools...") |
| 73 | search_result = session.search(query="send email") |
| 74 | print(f" Success: {search_result.success}, Results: {len(search_result.results)}") |
no test coverage detected
searching dependent graphs…