MCPcopy
hub / github.com/locustio/locust / test_custom_arguments

Method test_custom_arguments

locust/test/test_main.py:80–118  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

78
79 @unittest.skipIf(IS_WINDOWS, reason="Signal handling on windows is hard")
80 def test_custom_arguments(self):
81 port = get_free_tcp_port()
82 with temporary_file(
83 content=textwrap.dedent(
84 """
85 from locust import User, task, constant, events
86 @events.init_command_line_parser.add_listener
87 def _(parser, **kw):
88 parser.add_argument("--custom-string-arg")
89
90 class TestUser(User):
91 wait_time = constant(10)
92 @task
93 def my_task(self):
94 print(self.environment.parsed_options.custom_string_arg)
95 """
96 )
97 ) as file_path:
98 # print(subprocess.check_output(["cat", file_path]))
99 with TestProcess(
100 f"locust -f {file_path} --custom-string-arg command_line_value --web-port {port}",
101 ) as tp:
102 tp.expect("Starting Locust")
103 wait_for_server(f"http://127.0.0.1:{port}/swarm")
104 requests.post(
105 f"http://127.0.0.1:{port}/swarm",
106 data={
107 "user_count": 1,
108 "spawn_rate": 1,
109 "host": "https://localhost",
110 "custom_string_arg": "web_form_value",
111 },
112 )
113 tp.expect('All users spawned: {"TestUser": 1} (1 total users)')
114 tp.expect("web_form_value", stream="stdout")
115 tp.terminate()
116 tp.expect("Shutting down")
117 tp.expect("Aggregated")
118 tp.not_expect_any("command_line_value")
119
120 @unittest.skipIf(IS_WINDOWS, reason="Signal handling on windows is hard")
121 def test_custom_arguments_in_file(self):

Callers

nothing calls this directly

Calls 8

get_free_tcp_portFunction · 0.85
temporary_fileFunction · 0.85
TestProcessClass · 0.85
wait_for_serverFunction · 0.85
expectMethod · 0.80
terminateMethod · 0.80
not_expect_anyMethod · 0.80
postMethod · 0.45

Tested by

no test coverage detected