prepareSockets sets up the ZMQ sockets through which the kernel will communicate.
(connInfo ConnectionInfo)
| 256 | // prepareSockets sets up the ZMQ sockets through which the kernel |
| 257 | // will communicate. |
| 258 | func prepareSockets(connInfo ConnectionInfo) (SocketGroup, error) { |
| 259 | // Initialize the socket group. |
| 260 | var ( |
| 261 | sg SocketGroup |
| 262 | err error |
| 263 | ctx = context.Background() |
| 264 | ) |
| 265 | |
| 266 | // Create the shell socket, a request-reply socket that may receive messages from multiple frontend for |
| 267 | // code execution, introspection, auto-completion, etc. |
| 268 | sg.ShellSocket.Socket = zmq4.NewRouter(ctx) |
| 269 | sg.ShellSocket.Lock = &sync.Mutex{} |
| 270 | |
| 271 | // Create the control socket. This socket is a duplicate of the shell socket where messages on this channel |
| 272 | // should jump ahead of queued messages on the shell socket. |
| 273 | sg.ControlSocket.Socket = zmq4.NewRouter(ctx) |
| 274 | sg.ControlSocket.Lock = &sync.Mutex{} |
| 275 | |
| 276 | // Create the stdin socket, a request-reply socket used to request user input from a front-end. This is analogous |
| 277 | // to a standard input stream. |
| 278 | sg.StdinSocket.Socket = zmq4.NewRouter(ctx) |
| 279 | sg.StdinSocket.Lock = &sync.Mutex{} |
| 280 | |
| 281 | // Create the iopub socket, a publisher for broadcasting data like stdout/stderr output, displaying execution |
| 282 | // results or errors, kernel status, etc. to connected subscribers. |
| 283 | sg.IOPubSocket.Socket = zmq4.NewPub(ctx) |
| 284 | sg.IOPubSocket.Lock = &sync.Mutex{} |
| 285 | |
| 286 | // Create the heartbeat socket, a request-reply socket that only allows alternating recv-send (request-reply) |
| 287 | // calls. It should echo the byte strings it receives to let the requester know the kernel is still alive. |
| 288 | sg.HBSocket.Socket = zmq4.NewRep(ctx) |
| 289 | sg.HBSocket.Lock = &sync.Mutex{} |
| 290 | |
| 291 | // Bind the sockets. |
| 292 | address := fmt.Sprintf("%v://%v:%%v", connInfo.Transport, connInfo.IP) |
| 293 | err = sg.ShellSocket.Socket.Listen(fmt.Sprintf(address, connInfo.ShellPort)) |
| 294 | if err != nil { |
| 295 | return sg, fmt.Errorf("could not listen on shell-socket: %w", err) |
| 296 | } |
| 297 | |
| 298 | err = sg.ControlSocket.Socket.Listen(fmt.Sprintf(address, connInfo.ControlPort)) |
| 299 | if err != nil { |
| 300 | return sg, fmt.Errorf("could not listen on control-socket: %w", err) |
| 301 | } |
| 302 | |
| 303 | err = sg.StdinSocket.Socket.Listen(fmt.Sprintf(address, connInfo.StdinPort)) |
| 304 | if err != nil { |
| 305 | return sg, fmt.Errorf("could not listen on stdin-socket: %w", err) |
| 306 | } |
| 307 | |
| 308 | err = sg.IOPubSocket.Socket.Listen(fmt.Sprintf(address, connInfo.IOPubPort)) |
| 309 | if err != nil { |
| 310 | return sg, fmt.Errorf("could not listen on iopub-socket: %w", err) |
| 311 | } |
| 312 | |
| 313 | err = sg.HBSocket.Socket.Listen(fmt.Sprintf(address, connInfo.HBPort)) |
| 314 | if err != nil { |
| 315 | return sg, fmt.Errorf("could not listen on hbeat-socket: %w", err) |