(self, **kw)
| 125 | self.part_json["streamSettings"]["kcpSettings"]["header"]["type"] = type_str |
| 126 | |
| 127 | def write(self, **kw): |
| 128 | security_backup, tls_settings_backup, origin_protocol, domain = "", "", None, "" |
| 129 | no_tls_group = (StreamType.MTPROTO, StreamType.SS) |
| 130 | |
| 131 | if self.part_json['protocol'] == 'shadowsocks': |
| 132 | origin_protocol = StreamType.SS |
| 133 | elif self.part_json['protocol'] == 'mtproto': |
| 134 | origin_protocol = StreamType.MTPROTO |
| 135 | elif self.part_json['protocol'] == 'socks': |
| 136 | origin_protocol = StreamType.SOCKS |
| 137 | elif self.part_json['protocol'] == 'vless': |
| 138 | if self.part_json["streamSettings"]["network"] == "grpc": |
| 139 | origin_protocol = StreamType.VLESS_GRPC |
| 140 | elif self.part_json["streamSettings"]["security"] == "reality": |
| 141 | origin_protocol = StreamType.VLESS_REALITY |
| 142 | elif self.part_json["streamSettings"]["security"] == "tls": |
| 143 | origin_protocol = StreamType.VLESS_TLS |
| 144 | else: |
| 145 | origin_protocol = StreamType.VLESS_TCP |
| 146 | elif self.part_json['protocol'] == 'trojan': |
| 147 | origin_protocol = StreamType.TROJAN |
| 148 | |
| 149 | if origin_protocol not in no_tls_group: |
| 150 | security_backup = self.part_json["streamSettings"]["security"] |
| 151 | if origin_protocol == StreamType.VLESS_REALITY: |
| 152 | tls_settings_backup = self.part_json["streamSettings"]["realitySettings"] |
| 153 | else: |
| 154 | tls_settings_backup = self.part_json["streamSettings"]["tlsSettings"] |
| 155 | if "domain" in self.part_json: |
| 156 | domain = self.part_json["domain"] |
| 157 | |
| 158 | #mtproto换成其他协议时, 减少mtproto int和out的路由绑定 |
| 159 | if origin_protocol == StreamType.MTPROTO and origin_protocol != self.stream_type: |
| 160 | clean_mtproto_tag(self.config, self.group_index) |
| 161 | |
| 162 | server = self.load_template('server.json') |
| 163 | server["inbounds"][0]["port"] = self.part_json["port"] |
| 164 | server["inbounds"][0]["settings"]["clients"][0]["id"] = str(uuid.uuid4()) |
| 165 | if "allocate" in self.part_json: |
| 166 | server["inbounds"][0]["allocate"] = self.part_json["allocate"] |
| 167 | if "sniffing" in self.part_json: |
| 168 | server["inbounds"][0]["sniffing"] = self.part_json["sniffing"] |
| 169 | self.part_json = server["inbounds"][0] |
| 170 | self.config["inbounds"][self.group_index] = self.part_json |
| 171 | |
| 172 | if self.stream_type.name.startswith('KCP'): |
| 173 | self.to_kcp(self.stream_type.value) |
| 174 | |
| 175 | elif self.stream_type == StreamType.TCP: |
| 176 | self.part_json["streamSettings"] = self.load_template('tcp.json') |
| 177 | |
| 178 | elif self.stream_type == StreamType.TCP_HOST: |
| 179 | http = self.load_template('http.json') |
| 180 | http["tcpSettings"]["header"]["request"]["headers"]["Host"] = kw["host"] |
| 181 | self.part_json["streamSettings"] = http |
| 182 | |
| 183 | elif self.stream_type == StreamType.MTPROTO: |
| 184 | mtproto = self.load_template('mtproto.json') |
no test coverage detected