Langage : Python
 Édité le 5 avril 2022
Télécharger | Reposter
#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Created on Tue Apr 5 15:05:25 2022 @author: cubicibo """ from enum import Enum from struct import unpack, pack import logging import sys from os import path class PGSegment: class PGSOff(Enum): MAGIC_HEADER = slice(0, 2) PRES_TS = slice(2, 6) DECODE_TS = slice(6, 10) SEG_TYPE = 10 SEG_LENGTH = slice(11,13) _HEADER_LEN: int = 13 MAGIC: bytes = b"PG" SEGMENT: dict[int, str] = {0x14: 'PDS', 0x15: 'ODS', 0x16: 'PCS', 0x17: 'WDS', 0x80: 'END'} def __init__(self, data: bytes) -> None: if __class__.MAGIC != data[__class__.PGSOff.MAGIC_HEADER.value]: raise ValueError("Expected a PG segment, got " + \ f"{data[__class__.PGSOff.MAGIC_HEADER.value]}.") if len(data) < __class__._HEADER_LEN: raise BufferError("Got too few bytes to analyse stream.") length = unpack(">H", data[__class__.PGSOff.SEG_LENGTH.value])[0] seg_data = data[__class__._HEADER_LEN:length+__class__._HEADER_LEN] if len(seg_data) < length: raise EOFError(f"Payload is missing {length-len(seg_data)} bytes.") self._bytes = data[:length+__class__._HEADER_LEN] @property def pts_ts(self) -> float: return unpack(">I", self._bytes[__class__.PGSOff.PRES_TS.value])[0]/90e3 @pts_ts.setter def pts_ts(self, ts: float) -> None: self._bytes[__class__.PGSOff.PRES_TS.value] = pack(">I", int(ts*90e3)) @property def dts_ts(self) -> float: return unpack(">I", self._bytes[__class__.PGSOff.DECODE_TS.value])[0]/90e3 @dts_ts.setter def dts_ts(self, ts: float) -> None: self._bytes[__class__.PGSOff.DECODE_TS.value] = pack(">I",int(round(ts*90e3))) @property def type(self) -> str: return __class__.SEGMENT[self._bytes[__class__.PGSOff.SEG_TYPE.value]] @property def payload(self) -> bytes: return self._bytes[__class__._HEADER_LEN:] @property def size(self) -> int: return unpack(">H", self._bytes[__class__.PGSOff.SEG_LENGTH.value])[0] def __str__(self): return f"{self.type} at {self.pts_ts}[s], {self.size} bytes." def __len__(self): return len(self._bytes) def __equ__(self, other): return self._bytes == other._bytes class SupStream: BUFFER_N_BYTES = 1048576 def __init__(self, data: str) -> None: """ Manage a Sup Stream from a file, bytestring or an actual data stream. Stop iterating once the buffer is consumed. """ self.stream = open(data, 'rb') self.s_index = 0 self._data = bytearray() def renew(self) -> None: len_before = len(self._data) self._data += self.stream.read(__class__.BUFFER_N_BYTES) read_back = len(self._data) - len_before self.s_index += read_back return read_back def fetch_segment(self) -> PGSegment: """ Generator of PGS segment in the specified stream. :return: PGSegment """ if self.s_index == -1 or self.stream.closed: raise Exception("Attempting to use a closed datastream.") while True: if len(self._data) == 0 and not self.renew(): return try: seg = PGSegment(self._data) self._data = self._data[len(seg):] yield seg except EOFError: if self.renew() == 0: self.close() return except (ValueError, BufferError) as e: pg_id = self._data.find(PGSegment.MAGIC) if pg_id == -1: self._data = bytearray() else: self._data = self._data[pg_id:] if type(e) is ValueError: logging.warning("Garbage in PGStream encountered.") def close(self) -> None: self.stream.close() self.s_index = -1 if __name__ == '__main__': if len(sys.argv) < 3: print("Usage:\n python3 pgshift.py IN_FILE MILLISECONDS OUT_FILE [Opt. -dts]\n" " -dts : shift decode timestamps as well\n") sys.exit(0) if not path.exists(sys.argv[1]): raise OSError("File does not appear to exist or path is incorrect.") if path.exists(sys.argv[3]): raise OSError("Output file already exists. Delete it or use another name.") delay = float(int(sys.argv[2]))/1000 print(f"Adding {delay} seconds to timestamps of {sys.argv[1]}") update_dts = '-dts' in sys.argv[4:] with open(sys.argv[3], 'wb') as f: s = SupStream(sys.argv[1]) for seg in s.fetch_segment(): if update_dts: seg.dts_ts += delay seg.pts_ts += delay f.write(seg._bytes)
x
Éditer le texte

Merci d'entrer le mot de passe que vous avez indiqué à la création du texte.

x
Télécharger le texte

Merci de choisir le format du fichier à télécharger.