Untitled
unknown
plain_text
2 years ago
10 kB
11
Indexable
def __sort_countri(self,country=None,text=None):
"""
This will Ignore all extra country taggings and put only when needed.
by ~ HDC
"""
tex = {
'CC': 'CC',
'SDH': 'SDH',
'Forced': 'Forced',
'Hong Kong': 'Hong Kong'
}
dic = {
'en-GB': 'United Kingdom',
'fr-CA': 'Canadian',
'pt-BR': 'Brazilian',
'zh-Hans': 'Simplified',
'zh-Hant': 'Traditional',
'fr-BG': 'Belgium',
'fr-LU': 'Luxembourg',
'es-150': 'European',
'nl-BE': 'Belgium',
'nl-NL': 'Netherlands',
'es-419': 'Latin American',
'zh-yue-Hant': 'Cantonese (Traditional)',
'zh-yue-Hans': 'Cantonese (Simplified)'
}
sic = {
'United Kingdom (SDH)': 'United Kingdom',
'Traditional (SDH)': 'Traditional',
'Simplified (SDH)': 'Simplified'
}
if self.sources == 'ATVP':
if country is not None:
if country == 'en':
return tex.get(text)
else:
return dic.get(country)
else:
if country is not None:
if text is not None:
new_text = text.split('(')[-1].rstrip(')')
if new_text == 'Forced':
return new_text
elif new_text == 'SDH':
new_sic = sic.get(text)
if new_sic is not None:
return new_sic
else:
return new_text
get_tag = tex.get(text)
if get_tag is not None:
return get_tag
else:
return dic.get(country)
def unbuffered(self, proc, stream="stdout"):
newlines = ["\n", "\r\n", "\r"]
stream = getattr(proc, stream)
with contextlib.closing(stream):
while True:
out = []
last = stream.read(1)
# Don't loop forever
if last == "" and proc.poll() is not None:
break
while last not in newlines:
# Don't loop forever
if last == "" and proc.poll() is not None:
break
out.append(last)
last = stream.read(1)
out = "".join(out)
yield out
def mux(self, prefix: str, hybrid) -> tuple[Path, int]:
"""
Takes the Video, Audio and Subtitle Tracks, and muxes them into an MKV file.
It will attempt to detect Forced/Default tracks, and will try to parse the language codes of the Tracks
"""
log = Logger.getLogger("Tracks")
if self.videos:
hybird_location = None
if hybrid:
dv_name = str(self.videos[-1].locate())
hdr_out = dv_name.replace('.mp4','.hevc')
self.DV_INJECTION(self.videos[0].locate(),self.videos[-1].locate())
hybird_location = hdr_out
#print(muxed_hybird_location)
if hybird_location:
muxed_location = Path(hybird_location)
else: muxed_location = self.videos[0].locate()
#print(muxed_location)
#log.debug(muxed_location)
#log.debug(hybird_location)
if not muxed_location:
raise ValueError("The provided video track has not yet been downloaded.")
muxed_location = muxed_location.with_suffix(".muxed.mkv")
elif self.audio:
muxed_location = self.audio[0].locate()
if not muxed_location:
raise ValueError("A provided audio track has not yet been downloaded.")
muxed_location = muxed_location.with_suffix(".muxed.mka")
elif self.subtitles:
muxed_location = self.subtitles[0].locate()
if not muxed_location:
raise ValueError("A provided subtitle track has not yet been downloaded.")
muxed_location = muxed_location.with_suffix(".muxed.mks")
elif self.chapters:
muxed_location = Path(str(config.filenames.chapters).format(filename=prefix))
if not muxed_location:
raise ValueError("A provided chapter has not yet been downloaded.")
muxed_location = muxed_location.with_suffix(".muxed.mkv")
else:
raise ValueError("No tracks provided, at least one track must be provided.")
with open('source.json', 'r') as fils:
v = json.loads(fils.read())
self.sources = v.get('source')
fils.close()
#print('Detected Source: {}'.format(self.sources))
cl = [
"mkvmerge",
"--no-date",
"--priority",
"highest",
"--output",
str(muxed_location)
]
sdh_removed_subs = []
if hybrid:
self.videos = [self.videos[0]]
#print(self.videos)
for i, vt in enumerate(self.videos):
location = vt.locate()
if hybird_location:
location = hybird_location
if not location:
raise ValueError("Somehow a Video Track was not downloaded before muxing...")
mediainfo = MediaInfo.parse(vt.locate())
if mediainfo.video_tracks[0].encryption:
log.exit(f" x Video track is not properly decrypted")
original_framerate = (
mediainfo.tracks[1].other_frame_rate[0].split("(")[1].split(")")[0]
if "(" in mediainfo.tracks[1].other_frame_rate[0]
else mediainfo.tracks[1].other_frame_rate[0].replace(" FPS", "")
)
cl.extend([
#"--language", "0:{}".format(LANGUAGE_MUX_MAP.get(
# str(vt.language), str(vt.language)
#)),
"--default-track", f"0:{i == 0}",
"--original-flag", f"0:{vt.is_original_lang}",
"--compression", "0:none", # disable extra compression
"--default-duration",
f"0:{original_framerate}fps", # native framerate
"(", str(location), ")"
])
for i, at in enumerate(self.audio):
location = at.locate()
if not location:
raise ValueError("Somehow an Audio Track was not downloaded before muxing...")
cl.extend([
"--language", "0:{}".format(LANGUAGE_MUX_MAP.get(
str(at.language), str(at.language)
)),
"--default-track", f"0:{i == 0}",
"--visual-impaired-flag", f"0:{at.descriptive}",
"--original-flag", f"0:{at.is_original_lang}",
"--compression", "0:none", # disable extra compression
"(", str(location), ")"
])
for st in self.subtitles:
location = st.locate()
if not location:
raise ValueError("Somehow a Text Track was not downloaded before muxing...")
#default = bool(self.audio and is_close_match(st.language, [self.audio[0].language]) and st.forced)
tname = st.get_track_name()
if format(LANGUAGE_MUX_MAP.get(str(st.language), st.language.to_alpha3())) == "eng":
if tname is not None:
if 'SDH' in str(tname) and self.sources not in ["ATVP"]:
# if 'SDH' in str(tname) and self.sources not in ["ATVP", "NF"]:
if len(str(tname).split(',')) > 1:
cl.extend(["--track-name", f"0:{str(tname).split(',')[0] or ''}"])
output_name = self.convert_to_normal(location)
cl.extend([
"--language", "0:{}".format(st.language),
"--sub-charset", "0:UTF-8",
"--default-track", "0:no",
"--compression", "0:none",
output_name
])
cl.extend([
"--track-name", f"0:{tname or ''}",
"--language", "0:{}".format(LANGUAGE_MUX_MAP.get(
str(st.language), str(st.language)
)),
"--sub-charset", "0:UTF-8",
"--forced-track", f"0:{st.forced}",
"--default-track", "0:no",
"--hearing-impaired-flag", f"0:{st.sdh}",
"--original-flag", f"0:{st.is_original_lang}",
"--compression", "0:none", # disable extra compression (probably zlib)
"(", str(location), ")"
])
for st in self.subtitles:
location = st.locate()
if not location:
raise ValueError("Somehow a Text Track was not downloaded before muxing...")
#default = bool(self.audio and is_close_match(st.language, [self.audio[0].language]) and st.forced)
if format(LANGUAGE_MUX_MAP.get(str(st.language), st.language.to_alpha3())) != "eng":
tname = st.get_track_name()
cl.extend([
"--track-name", f"0:{tname or ''}",
"--language", "0:{}".format(LANGUAGE_MUX_MAP.get(
str(st.language), str(st.language)
)),
"--sub-charset", "0:UTF-8",
"--forced-track", f"0:{st.forced}",
"--default-track", "0:no",
"--hearing-impaired-flag", f"0:{st.sdh}",
"--original-flag", f"0:{st.is_original_lang}",
"--compression", "0:none", # disable extra compression (probably zlib)
"(", str(location), ")"
])
Editor is loading...
Leave a Comment