| 2315 | self.filename = getattr(filename, "name", "No name") |
| 2316 | |
| 2317 | def _write_header(self, pkt): |
| 2318 | # type: (Optional[Union[Packet, bytes]]) -> None |
| 2319 | self.header_present = True |
| 2320 | |
| 2321 | if self.append: |
| 2322 | # Even if prone to race conditions, this seems to be |
| 2323 | # safest way to tell whether the header is already present |
| 2324 | # because we have to handle compressed streams that |
| 2325 | # are not as flexible as basic files |
| 2326 | if self.gz: |
| 2327 | g = gzip.open(self.filename, "rb") # type: _ByteStream |
| 2328 | else: |
| 2329 | g = open(self.filename, "rb") |
| 2330 | try: |
| 2331 | if g.read(16): |
| 2332 | return |
| 2333 | finally: |
| 2334 | g.close() |
| 2335 | |
| 2336 | if not hasattr(self, 'linktype'): |
| 2337 | raise ValueError( |
| 2338 | "linktype could not be guessed. " |
| 2339 | "Please pass a linktype while creating the writer" |
| 2340 | ) |
| 2341 | |
| 2342 | self.f.write(struct.pack(self.endian + "IHHIIII", 0xa1b23c4d if self.nano else 0xa1b2c3d4, # noqa: E501 |
| 2343 | 2, 4, 0, 0, self.snaplen, self.linktype)) |
| 2344 | self.f.flush() |
| 2345 | |
| 2346 | def _write_packet(self, |
| 2347 | packet, # type: Union[bytes, Packet] |