processMessage is an infinite loop reading incoming socks connection data from the agent and sending it to the SOCKS client
()
| 247 | |
| 248 | // processMessage is an infinite loop reading incoming socks connection data from the agent and sending it to the SOCKS client |
| 249 | func processMessage() { |
| 250 | for { |
| 251 | job := <-jobsIn |
| 252 | agent := job.AgentID |
| 253 | socks := job.Payload.(merlinJob.Socks) |
| 254 | |
| 255 | // Make sure the connection ID is known |
| 256 | // The Agent can send back data for a connection that has been closed and deleted by the SOCKS client |
| 257 | // So drop the job |
| 258 | conn, ok := connections.Load(socks.ID) |
| 259 | if !ok { |
| 260 | slog.Debug("Unknown SOCKS connection", "ID", socks.ID, "Index", socks.Index, "Data Length", len(socks.Data)) |
| 261 | continue |
| 262 | } |
| 263 | |
| 264 | // Ensure this is the right index |
| 265 | if conn.(*Connection).Index == socks.Index { |
| 266 | n, err := conn.(*Connection).Conn.Write(socks.Data) |
| 267 | conn.(*Connection).Index++ |
| 268 | if err != nil { |
| 269 | slog.Error("there was an error writing to the SOCKS client", "Agent", agent, "ID", socks.ID, "Index", socks.Index, "Close", socks.Close, "Data Length", len(socks.Data), "Error", err) |
| 270 | continue |
| 271 | } |
| 272 | if core.Debug { |
| 273 | slog.Debug(fmt.Sprintf("Wrote %d bytes with message index %d to the SOCKS client for agent %s connection ID %s with error %s", n, socks.Index, agent, socks.ID, err)) |
| 274 | } |
| 275 | } else { |
| 276 | if core.Debug { |
| 277 | slog.Debug(fmt.Sprintf("Received job out of order for agent %s connection %s. Expected %d, got %d", agent, socks.ID, conn.(*Connection).Index, socks.Index)) |
| 278 | } |
| 279 | jobsIn <- job |
| 280 | } |
| 281 | } |
| 282 | } |
| 283 | |
| 284 | // In is the entrypoint for accepting SOCKS messages that came in from the agent and need to be sent to the SOCKS client |
| 285 | func In(job merlinJob.Job) { |