MCPcopy Index your code
hub / github.com/Huanshere/VideoLingo / process_batch

Function process_batch

batch/utils/batch_processor.py:25–94  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

23 return original_source_lang, original_target_lang
24
25def process_batch():
26 if not check_settings():
27 raise Exception("Settings check failed")
28
29 df = pd.read_excel('batch/tasks_setting.xlsx')
30 for index, row in df.iterrows():
31 if pd.isna(row['Status']) or 'Error' in str(row['Status']):
32 total_tasks = len(df)
33 video_file = row['Video File']
34
35 if not pd.isna(row['Status']) and 'Error' in str(row['Status']):
36 console.print(Panel(f"Retrying failed task: {video_file}\nTask {index + 1}/{total_tasks}",
37 title="[bold yellow]Retry Task", expand=False))
38
39 # Restore files from batch/output/ERROR to output
40 error_folder = os.path.join('batch', 'output', 'ERROR', os.path.splitext(video_file)[0])
41
42 if os.path.exists(error_folder):
43 # Ensure the output folder exists
44 os.makedirs('output', exist_ok=True)
45
46 # Copy all contents from ERROR folder for the specific video to output
47 for item in os.listdir(error_folder):
48 src_path = os.path.join(error_folder, item)
49 dst_path = os.path.join('output', item)
50
51 if os.path.isdir(src_path):
52 if os.path.exists(dst_path):
53 shutil.rmtree(dst_path)
54 shutil.copytree(src_path, dst_path)
55 else:
56 if os.path.exists(dst_path):
57 os.remove(dst_path)
58 shutil.copy2(src_path, dst_path)
59
60 console.print(f"[green]Restored files from ERROR folder for {video_file}")
61 else:
62 console.print(f"[yellow]Warning: Error folder not found: {error_folder}")
63 else:
64 console.print(Panel(f"Now processing task: {video_file}\nTask {index + 1}/{total_tasks}",
65 title="[bold blue]Current Task", expand=False))
66
67 source_language = row['Source Language']
68 target_language = row['Target Language']
69
70 original_source_lang, original_target_lang = record_and_update_config(source_language, target_language)
71
72 try:
73 dubbing = 0 if pd.isna(row['Dubbing']) else int(row['Dubbing'])
74 is_retry = not pd.isna(row['Status']) and 'Error' in str(row['Status'])
75 status, error_step, error_message = process_video(video_file, dubbing, is_retry)
76 status_msg = "Done" if status else f"Error: {error_step} - {error_message}"
77 except Exception as e:
78 status_msg = f"Error: Unhandled exception - {str(e)}"
79 console.print(f"[bold red]Error processing {video_file}: {status_msg}")
80 finally:
81 update_key('whisper.language', original_source_lang)
82 update_key('target_language', original_target_lang)

Callers 1

batch_processor.pyFile · 0.85

Calls 4

check_settingsFunction · 0.90
process_videoFunction · 0.90
update_keyFunction · 0.90
record_and_update_configFunction · 0.85

Tested by

no test coverage detected