(self, protocolOp, controls: List[LDAP_Control] = None, **kwargs)
| 1887 | self.sock = StreamSocket(sock, LDAP) |
| 1888 | |
| 1889 | def sr1(self, protocolOp, controls: List[LDAP_Control] = None, **kwargs): |
| 1890 | self.messageID += 1 |
| 1891 | if self.verb: |
| 1892 | print(conf.color_theme.opening(">> %s" % protocolOp.__class__.__name__)) |
| 1893 | # Build packet |
| 1894 | pkt = LDAP( |
| 1895 | messageID=self.messageID, |
| 1896 | protocolOp=protocolOp, |
| 1897 | Controls=controls, |
| 1898 | ) |
| 1899 | # If signing / encryption is used, apply |
| 1900 | if self.sasl_wrap: |
| 1901 | pkt = LDAP_SASL_Buffer( |
| 1902 | Buffer=self.ssp.GSS_Wrap( |
| 1903 | self.sspcontext, |
| 1904 | bytes(pkt), |
| 1905 | conf_req_flag=self.encrypt, |
| 1906 | ) |
| 1907 | ) |
| 1908 | # Send / Receive |
| 1909 | resp = self.sock.sr1( |
| 1910 | pkt, |
| 1911 | verbose=0, |
| 1912 | **kwargs, |
| 1913 | ) |
| 1914 | # Check for unsolicited notification |
| 1915 | if resp and LDAP in resp and resp[LDAP].unsolicited: |
| 1916 | if self.verb: |
| 1917 | resp.show() |
| 1918 | print(conf.color_theme.fail("! Got unsolicited notification.")) |
| 1919 | return resp |
| 1920 | # If signing / encryption is used, unpack |
| 1921 | if self.sasl_wrap: |
| 1922 | if resp.Buffer: |
| 1923 | resp = LDAP( |
| 1924 | self.ssp.GSS_Unwrap( |
| 1925 | self.sspcontext, |
| 1926 | resp.Buffer, |
| 1927 | ) |
| 1928 | ) |
| 1929 | else: |
| 1930 | resp = None |
| 1931 | if self.verb: |
| 1932 | if not resp: |
| 1933 | print(conf.color_theme.fail("! Bad response.")) |
| 1934 | return |
| 1935 | else: |
| 1936 | print( |
| 1937 | conf.color_theme.success( |
| 1938 | "<< %s" |
| 1939 | % ( |
| 1940 | resp.protocolOp.__class__.__name__ |
| 1941 | if LDAP in resp |
| 1942 | else resp.__class__.__name__ |
| 1943 | ) |
| 1944 | ) |
| 1945 | ) |
| 1946 | return resp |
no test coverage detected