(self, dev:AMDDevice)
| 522 | self._q, self.cmd_sizes = hw_view, [len(self.indirect_cmd)] |
| 523 | |
| 524 | def _submit(self, dev:AMDDevice): |
| 525 | sdma_queue = dev.sdma_queue(self.queue_idx) |
| 526 | if self.binded_device == dev: |
| 527 | # An IB packet must end on a 8 DW boundary. |
| 528 | add = (8 - (((sdma_queue.put_value % 32) // 4) + len(self.indirect_cmd) % 8)) % 8 |
| 529 | cmds, cmd_sizes = ([0] * add) + self.indirect_cmd, [len(self.indirect_cmd) + add] |
| 530 | |
| 531 | if len(cmds) * 4 >= (sdma_queue.ring.nbytes - sdma_queue.put_value % sdma_queue.ring.nbytes): |
| 532 | cmds, cmd_sizes = [0, 0] + self.indirect_cmd, [8] |
| 533 | else: cmds, cmd_sizes = self._q, self.internal_cmd_sizes |
| 534 | |
| 535 | tail_blit_dword = 0 |
| 536 | for cmdsz in cmd_sizes: |
| 537 | if (tail_blit_dword + cmdsz) * 4 >= sdma_queue.ring.nbytes - sdma_queue.put_value % sdma_queue.ring.nbytes: break |
| 538 | tail_blit_dword += cmdsz |
| 539 | |
| 540 | # Force align of submits to hit our usb layer write cache. |
| 541 | if (rem_packet_cnt := len(cmds) - tail_blit_dword) > 0 and dev.is_usb(): tail_blit_dword = 0 |
| 542 | |
| 543 | # USB devices run in single-step mode, so they can't overrun the queue. |
| 544 | total_bytes = (tail_blit_dword * 4 if rem_packet_cnt == 0 else -sdma_queue.put_value % sdma_queue.ring.nbytes) + rem_packet_cnt * 4 |
| 545 | assert total_bytes < sdma_queue.ring.nbytes, "SDMA queue overrun" |
| 546 | while not dev.is_usb() and sdma_queue.put_value + total_bytes - sdma_queue.read_ptr[0] > sdma_queue.ring.nbytes: pass |
| 547 | |
| 548 | start_idx = (sdma_queue.put_value % sdma_queue.ring.nbytes) // 4 |
| 549 | sdma_queue.ring[start_idx : start_idx + tail_blit_dword] = array.array('I', cmds[:tail_blit_dword]) |
| 550 | sdma_queue.put_value += tail_blit_dword * 4 |
| 551 | |
| 552 | if (rem_packet_cnt := len(cmds) - tail_blit_dword) > 0: |
| 553 | zero_fill = sdma_queue.ring.nbytes - sdma_queue.put_value % sdma_queue.ring.nbytes |
| 554 | sdma_queue.ring.view(sdma_queue.put_value % sdma_queue.ring.nbytes, zero_fill, fmt='B')[:] = bytes(zero_fill) |
| 555 | sdma_queue.put_value += zero_fill |
| 556 | |
| 557 | sdma_queue.ring[0:rem_packet_cnt] = array.array('I', cmds[tail_blit_dword:]) |
| 558 | sdma_queue.put_value += rem_packet_cnt * 4 |
| 559 | |
| 560 | sdma_queue.signal_doorbell(dev) |
| 561 | |
| 562 | class AMDProgram(HCQProgram): |
| 563 | def __init__(self, dev:AMDDevice, name:str, lib:bytes, **kwargs): |
nothing calls this directly
no test coverage detected