#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Tue Apr 5 15:05:25 2022
@author: cubicibo
"""
from enum import Enum
from struct import unpack, pack
import logging
import sys
from os import path
class PGSegment:
class PGSOff(Enum):
MAGIC_HEADER = slice(0, 2)
PRES_TS = slice(2, 6)
DECODE_TS = slice(6, 10)
SEG_TYPE = 10
SEG_LENGTH = slice(11,13)
_HEADER_LEN: int = 13
MAGIC: bytes = b"PG"
SEGMENT: dict[int, str] = {0x14: 'PDS', 0x15: 'ODS', 0x16: 'PCS',
0x17: 'WDS', 0x80: 'END'}
def __init__(self, data: bytes) -> None:
if __class__.MAGIC != data[__class__.PGSOff.MAGIC_HEADER.value]:
raise ValueError("Expected a PG segment, got " + \
f"{data[__class__.PGSOff.MAGIC_HEADER.value]}.")
if len(data) < __class__._HEADER_LEN:
raise BufferError("Got too few bytes to analyse stream.")
length = unpack(">H", data[__class__.PGSOff.SEG_LENGTH.value])[0]
seg_data = data[__class__._HEADER_LEN:length+__class__._HEADER_LEN]
if len(seg_data) < length:
raise EOFError(f"Payload is missing {length-len(seg_data)} bytes.")
self._bytes = data[:length+__class__._HEADER_LEN]
@property
def pts_ts(self) -> float:
return unpack(">I", self._bytes[__class__.PGSOff.PRES_TS.value])[0]/90e3
@pts_ts.setter
def pts_ts(self, ts: float) -> None:
self._bytes[__class__.PGSOff.PRES_TS.value] = pack(">I", int(ts*90e3))
@property
def dts_ts(self) -> float:
return unpack(">I", self._bytes[__class__.PGSOff.DECODE_TS.value])[0]/90e3
@dts_ts.setter
def dts_ts(self, ts: float) -> None:
self._bytes[__class__.PGSOff.DECODE_TS.value] = pack(">I",int(round(ts*90e3)))
@property
def type(self) -> str:
return __class__.SEGMENT[self._bytes[__class__.PGSOff.SEG_TYPE.value]]
@property
def payload(self) -> bytes:
return self._bytes[__class__._HEADER_LEN:]
@property
def size(self) -> int:
return unpack(">H", self._bytes[__class__.PGSOff.SEG_LENGTH.value])[0]
def __str__(self):
return f"{self.type} at {self.pts_ts}[s], {self.size} bytes."
def __len__(self):
return len(self._bytes)
def __equ__(self, other):
return self._bytes == other._bytes
class SupStream:
BUFFER_N_BYTES = 1048576
def __init__(self, data: str) -> None:
"""
Manage a Sup Stream from a file, bytestring or an actual data stream.
Stop iterating once the buffer is consumed.
"""
self.stream = open(data, 'rb')
self.s_index = 0
self._data = bytearray()
def renew(self) -> None:
len_before = len(self._data)
self._data += self.stream.read(__class__.BUFFER_N_BYTES)
read_back = len(self._data) - len_before
self.s_index += read_back
return read_back
def fetch_segment(self) -> PGSegment:
"""
Generator of PGS segment in the specified stream.
:return: PGSegment
"""
if self.s_index == -1 or self.stream.closed:
raise Exception("Attempting to use a closed datastream.")
while True:
if len(self._data) == 0 and not self.renew():
return
try:
seg = PGSegment(self._data)
self._data = self._data[len(seg):]
yield seg
except EOFError:
if self.renew() == 0:
self.close()
return
except (ValueError, BufferError) as e:
pg_id = self._data.find(PGSegment.MAGIC)
if pg_id == -1:
self._data = bytearray()
else:
self._data = self._data[pg_id:]
if type(e) is ValueError:
logging.warning("Garbage in PGStream encountered.")
def close(self) -> None:
self.stream.close()
self.s_index = -1
if __name__ == '__main__':
if len(sys.argv) < 3:
print("Usage:\n python3 pgshift.py IN_FILE MILLISECONDS OUT_FILE [Opt. -dts]\n"
" -dts : shift decode timestamps as well\n")
sys.exit(0)
if not path.exists(sys.argv[1]):
raise OSError("File does not appear to exist or path is incorrect.")
if path.exists(sys.argv[3]):
raise OSError("Output file already exists. Delete it or use another name.")
delay = float(int(sys.argv[2]))/1000
print(f"Adding {delay} seconds to timestamps of {sys.argv[1]}")
update_dts = '-dts' in sys.argv[4:]
with open(sys.argv[3], 'wb') as f:
s = SupStream(sys.argv[1])
for seg in s.fetch_segment():
if update_dts: seg.dts_ts += delay
seg.pts_ts += delay
f.write(seg._bytes)