Rallan 发表于 3 天前

m3u8url解析

"""
M3U8视频安全下载合并工具
功能:
1. 全量下载所有视频片段到临时目录
2. 完整性校验并提示缺失片段
3. 按顺序合并存在片段(边合并边清理)
4. 自动处理异常和重试
"""

import os
import re
import requests
from urllib.parse import urljoin, urlparse
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm

# ████████ 工具函数 ████████
def sanitize_filename(filename):
    """清理非法文件名字符"""
    return re.sub(r'[\\/*?:"<>|]', "_", filename)

def format_index(index, total):
    """生成统一格式的索引文件名"""
    digits = len(str(total))
    return f"{index:0{digits}d}.ts"

# ████████ 阶段1:下载所有片段 ████████
class DownloadManager:
    def __init__(self, m3u8_url, temp_folder="temp_ts"):
      self.m3u8_url = m3u8_url
      self.temp_folder = temp_folder
      self.headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
            'Referer': f'{urlparse(m3u8_url).scheme}://{urlparse(m3u8_url).netloc}/'
      }
      os.makedirs(temp_folder, exist_ok=True)

    def _get_retry_session(self):
      """创建带重试机制的Session"""
      session = requests.Session()
      retries = 5
      backoff_factor = 0.3
      status_forcelist =

      retry = requests.packages.urllib3.util.Retry(
            total=retries,
            read=retries,
            connect=retries,
            backoff_factor=backoff_factor,
            status_forcelist=status_forcelist,
      )
      adapter = requests.adapters.HTTPAdapter(max_retries=retry)
      session.mount('http://', adapter)
      session.mount('https://', adapter)
      return session

    def parse_m3u8(self):
      """解析M3U8获取片段列表"""
      try:
            response = requests.get(self.m3u8_url, headers=self.headers, timeout=10)
            response.raise_for_status()
            lines = response.text.splitlines()

            total = len()
            segments = []
            for index, line in enumerate(lines):
                line = line.strip()
                if line and not line.startswith('#'):
                  url = urljoin(self.m3u8_url, line)
                  filename = format_index(index, total)
                  segments.append({
                        'url': url,
                        'filename': filename,
                        'save_path': os.path.join(self.temp_folder, filename)
                  })
            return segments
      except Exception as e:
            raise RuntimeError(f"M3U8解析失败: {str(e)}")

    def download_all(self, max_workers=8):
      """多线程下载所有片段"""
      segments = self.parse_m3u8()
      print(f"需要下载总片段数: {len(segments)}")

      with tqdm(total=len(segments), desc="下载进度") as pbar:
            with ThreadPoolExecutor(max_workers=max_workers) as executor:
                futures = []
                session = self._get_retry_session()

                for seg in segments:
                  # 跳过已存在的文件
                  if os.path.exists(seg['save_path']):
                        pbar.update(1)
                        continue

                  future = executor.submit(
                        self._download_single,
                        session=session,
                        url=seg['url'],
                        save_path=seg['save_path']
                  )
                  futures.append(future)

                for future in as_completed(futures):
                  try:
                        future.result()
                  except Exception as e:
                        print(f"\n下载错误: {str(e)}")
                  pbar.update(1)

      return len(segments)

    def _download_single(self, session, url, save_path):
      """下载单个片段"""
      try:
            response = session.get(url, headers=self.headers, timeout=20)
            response.raise_for_status()
            with open(save_path, 'wb') as f:
                f.write(response.content)
      except Exception as e:
            raise RuntimeError(f"下载失败 {os.path.basename(save_path)}: {str(e)}")

# ████████ 阶段2:完整性校验 ████████
def check_missing(total, temp_folder):
    """检查缺失片段"""
    missing = []
    for index in range(total):
      filename = format_index(index, total)
      path = os.path.join(temp_folder, filename)
      if not os.path.exists(path):
            missing.append(index)

    if missing:
      print(f"警告:缺失 {len(missing)} 个片段 (示例: {missing[:5]}...)")
      choice = input("是否继续合并?(y/n): ").lower()
      if choice != 'y':
            exit()
    return missing

# ████████ 阶段3:安全合并 ████████
class SafeMerger:
    def __init__(self, temp_folder, output_file):
      self.temp_folder = temp_folder
      self.output_file = output_file
      self.total_files = len()

    def merge(self):
      """按顺序合并并删除临时文件"""
      sorted_files = sorted(
            ,
            key=lambda x: int(x.split('.'))
      )

      with open(self.output_file, 'wb') as outfile, \
             tqdm(total=self.total_files, desc="合并进度") as pbar:

            for filename in sorted_files:
                file_path = os.path.join(self.temp_folder, filename)
                try:
                  with open(file_path, 'rb') as infile:
                        outfile.write(infile.read())
                  os.remove(file_path)# 立即删除已合并文件
                  pbar.update(1)
                except Exception as e:
                  print(f"\n合并失败 {filename}: {str(e)}")

      # 清理空目录
      try:
            os.rmdir(self.temp_folder)
      except OSError:
            pass

# ████████ 主流程 ████████
def main(m3u8_url, output_file="output.mp4", temp_folder="temp_ts", max_workers=8):
    # 阶段1:下载
    downloader = DownloadManager(m3u8_url, temp_folder)
    total = downloader.download_all(max_workers)

    # 阶段2:校验
    missing = check_missing(total, temp_folder)

    # 阶段3:合并
    merger = SafeMerger(temp_folder, output_file)
    merger.merge()

    print(f"视频合并完成!保存至: {output_file}")
    if missing:
      print(f"注意:缺失 {len(missing)} 个片段,视频可能不完整")

if __name__ == '__main__':
    # 使用示例
    main(
      m3u8_url="https://surrit.com/e6847292-3317-4ca0-ae58-0b02aa5e6bba/1080p/video.m3u8",
      output_file="final_video2.mp4",
      temp_folder="download_cache",
      max_workers=10
    )
页: [1]
查看完整版本: m3u8url解析