我爱it学习

 找回密码
 立即注册
搜索
查看: 36|回复: 0

[Python] m3u8url解析

[复制链接]

8

主题

2

回帖

55

积分

注册会员

Rank: 2

积分
55
发表于 前天 15:35 | 显示全部楼层 |阅读模式
"""
M3U8视频安全下载合并工具
功能:
1. 全量下载所有视频片段到临时目录
2. 完整性校验并提示缺失片段
3. 按顺序合并存在片段(边合并边清理)
4. 自动处理异常和重试
"""

import os
import re
import requests
from urllib.parse import urljoin, urlparse
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm

# ████████ 工具函数 ████████
def sanitize_filename(filename):
    """清理非法文件名字符"""
    return re.sub(r'[\\/*?:"<>|]', "_", filename)

def format_index(index, total):
    """生成统一格式的索引文件名"""
    digits = len(str(total))
    return f"{index:0{digits}d}.ts"

# ████████ 阶段1:下载所有片段 ████████
class DownloadManager:
    def __init__(self, m3u8_url, temp_folder="temp_ts"):
        self.m3u8_url = m3u8_url
        self.temp_folder = temp_folder
        self.headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
            'Referer': f'{urlparse(m3u8_url).scheme}://{urlparse(m3u8_url).netloc}/'
        }
        os.makedirs(temp_folder, exist_ok=True)

    def _get_retry_session(self):
        """创建带重试机制的Session"""
        session = requests.Session()
        retries = 5
        backoff_factor = 0.3
        status_forcelist = [500, 502, 503, 504]

        retry = requests.packages.urllib3.util.Retry(
            total=retries,
            read=retries,
            connect=retries,
            backoff_factor=backoff_factor,
            status_forcelist=status_forcelist,
        )
        adapter = requests.adapters.HTTPAdapter(max_retries=retry)
        session.mount('http://', adapter)
        session.mount('https://', adapter)
        return session

    def parse_m3u8(self):
        """解析M3U8获取片段列表"""
        try:
            response = requests.get(self.m3u8_url, headers=self.headers, timeout=10)
            response.raise_for_status()
            lines = response.text.splitlines()

            total = len([l for l in lines if l.strip() and not l.startswith('#')])
            segments = []
            for index, line in enumerate(lines):
                line = line.strip()
                if line and not line.startswith('#'):
                    url = urljoin(self.m3u8_url, line)
                    filename = format_index(index, total)
                    segments.append({
                        'url': url,
                        'filename': filename,
                        'save_path': os.path.join(self.temp_folder, filename)
                    })
            return segments
        except Exception as e:
            raise RuntimeError(f"M3U8解析失败: {str(e)}")

    def download_all(self, max_workers=8):
        """多线程下载所有片段"""
        segments = self.parse_m3u8()
        print(f"需要下载总片段数: {len(segments)}")

        with tqdm(total=len(segments), desc="下载进度") as pbar:
            with ThreadPoolExecutor(max_workers=max_workers) as executor:
                futures = []
                session = self._get_retry_session()

                for seg in segments:
                    # 跳过已存在的文件
                    if os.path.exists(seg['save_path']):
                        pbar.update(1)
                        continue

                    future = executor.submit(
                        self._download_single,
                        session=session,
                        url=seg['url'],
                        save_path=seg['save_path']
                    )
                    futures.append(future)

                for future in as_completed(futures):
                    try:
                        future.result()
                    except Exception as e:
                        print(f"\n下载错误: {str(e)}")
                    pbar.update(1)

        return len(segments)

    def _download_single(self, session, url, save_path):
        """下载单个片段"""
        try:
            response = session.get(url, headers=self.headers, timeout=20)
            response.raise_for_status()
            with open(save_path, 'wb') as f:
                f.write(response.content)
        except Exception as e:
            raise RuntimeError(f"下载失败 {os.path.basename(save_path)}: {str(e)}")

# ████████ 阶段2:完整性校验 ████████
def check_missing(total, temp_folder):
    """检查缺失片段"""
    missing = []
    for index in range(total):
        filename = format_index(index, total)
        path = os.path.join(temp_folder, filename)
        if not os.path.exists(path):
            missing.append(index)

    if missing:
        print(f"警告:缺失 {len(missing)} 个片段 (示例: {missing[:5]}...)")
        choice = input("是否继续合并?(y/n): ").lower()
        if choice != 'y':
            exit()
    return missing

# ████████ 阶段3:安全合并 ████████
class SafeMerger:
    def __init__(self, temp_folder, output_file):
        self.temp_folder = temp_folder
        self.output_file = output_file
        self.total_files = len([f for f in os.listdir(temp_folder) if f.endswith('.ts')])

    def merge(self):
        """按顺序合并并删除临时文件"""
        sorted_files = sorted(
            [f for f in os.listdir(self.temp_folder) if f.endswith('.ts')],
            key=lambda x: int(x.split('.')[0])
        )

        with open(self.output_file, 'wb') as outfile, \
             tqdm(total=self.total_files, desc="合并进度") as pbar:

            for filename in sorted_files:
                file_path = os.path.join(self.temp_folder, filename)
                try:
                    with open(file_path, 'rb') as infile:
                        outfile.write(infile.read())
                    os.remove(file_path)  # 立即删除已合并文件
                    pbar.update(1)
                except Exception as e:
                    print(f"\n合并失败 {filename}: {str(e)}")

        # 清理空目录
        try:
            os.rmdir(self.temp_folder)
        except OSError:
            pass

# ████████ 主流程 ████████
def main(m3u8_url, output_file="output.mp4", temp_folder="temp_ts", max_workers=8):
    # 阶段1:下载
    downloader = DownloadManager(m3u8_url, temp_folder)
    total = downloader.download_all(max_workers)

    # 阶段2:校验
    missing = check_missing(total, temp_folder)

    # 阶段3:合并
    merger = SafeMerger(temp_folder, output_file)
    merger.merge()

    print(f"视频合并完成!保存至: {output_file}")
    if missing:
        print(f"注意:缺失 {len(missing)} 个片段,视频可能不完整")

if __name__ == '__main__':
    # 使用示例
    main(
        m3u8_url="https://surrit.com/e6847292-3317-4ca0-ae58-0b02aa5e6bba/1080p/video.m3u8",
        output_file="final_video2.mp4",
        temp_folder="download_cache",
        max_workers=10
    )
免责声明:
1、论坛里的文章仅代表作者本人的观点,与本网站立场无关。出于遵守国家相关法规或促进论坛发展的前提,我们有权在不经作者准许的情况下删除其在【我爱it学习】所发表的文章。
2、论坛的所有文章、内容、信息、资料,都不保证其准确性、完整性、有效性、时效性。请依据情况自身做出判断。因阅读本站内容而被误导等其他因素所造成的损失责任自负。【我爱it学习】不承担任何责任。
3、坛友对自己的言论和行为负责,完全承担发表内容的责任,所持立场与【我爱it学习】论坛无关。论坛使用者因为任何行为而触犯中华人民共和国法律或相关法规的,一切后果自己负责,【我爱it学习】不承担任何责任。
4、坛友所发布的信息中涉及到具体的第三方个人(单位/公司)隐私、商业秘密等,侵犯其权益,对其构成不良影响的,由第三方向【我爱it学习】提交正式书面申请删除该信息后,【我爱it学习】有权将该信息予以直接删除处理。
5、如因系统维护或升级而需暂停服务时,将事先公告。若因线路及非本站点控制范围外的硬件故障或其它不可抗力而导致暂停服务,于暂停服务期间造成的一切不便与损失,【我爱it学习】不负任何责任。
6、凡以任何方式登陆本站或直接、间接使用【我爱it学习】论坛资料者,视为自愿接受【我爱it学习】论坛总规则的约束。本声明未涉及的问题参见国家有关法律法规,当本声明与国家法律法规冲突时,以国家法律法规为准。
7、【我爱it学习】所发布的一切文章仅限用于学习和研究目的;不得将上述内容用于商业或者非法用途,否则,一切后果请用户自负。本站信息来自网络,版权争议与本站无关。您必须在下载后的24个小时之内,从您的电脑中彻底删除上述内容。如果您喜欢该程序,请支持正版软件,购买注册,得到更好的正版服务。如有侵权请邮件与我们联系处理,Mail To: [email protected]
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

侵权投诉|Archiver|小黑屋|我爱it学习

GMT+8, 2025-6-19 12:04

Powered by Discuz!

© 2001-2023 52itstudy.

快速回复 返回顶部 返回列表