EN
English
简体中文
Log inGet started for free

Blog

Residential Proxies

youtube-video-collection-at-scale-a-complete-python-pipeline-with-residential-proxy-integration

YouTube Video Collection at Scale: A Complete Python Pipeline with Residential Proxy Integration

This is a practical guide for engineers building video collection pipelines. Not theory. Code that runs. Configuration that works. Metrics that matter.

The pipeline has three stages: discovery, validation, and download. Each stage has specific proxy requirements. Discovery requires maximum IP distribution. Validation requires moderate distribution with regional targeting. Download requires session stability for large file completion.

Stage 1: Discovery

The discovery stage finds candidate videos matching training requirements. It executes search queries across multiple platforms and geographic regions. The goal is comprehensive coverage, not precision. We collect metadata for thousands of candidates, then filter in subsequent stages.

import requests
import json
from urllib.parse import quote_plus
from concurrent.futures import ThreadPoolExecutor

THORDATA_BASE = "http://user:pass@gate.thordata.com:10000"

class DiscoveryEngine:
    def __init__(self, scenarios, regions):
        self.scenarios = scenarios  # ["cooking", "driving", "construction"]
        self.regions = regions      # ["us", "gb", "jp", "in", "br"]
        
    def discover_all(self, max_per_scenario=10000):
        """
        Parallel discovery across scenarios and regions.
        Each query uses distinct IP for maximum distribution.
        """

        all_candidates = [ ]

        
        with ThreadPoolExecutor(max_workers=50) as executor:

            futures = [ ]

            
            for scenario in self.scenarios:
                for region in self.regions:
                    future = executor.submit(
                        self._discover_scenario_region,
                        scenario, region, max_per_scenario
                    )
                    futures.append((scenario, region, future))
            
            for scenario, region, future in futures:
                candidates = future.result()
                all_candidates.extend(candidates)
                print(f"Discovered {len(candidates)} for {scenario} in {region}")
        
        # Deduplicate by video ID
        seen = set()

        unique = [ ]

        for c in all_candidates:
            vid = self._extract_id(c["url"])
            if vid not in seen:
                seen.add(vid)
                unique.append(c)
        
        return unique
    
    def _discover_scenario_region(self, scenario, region, max_results):
        """
        Execute search with per-request IP rotation.
        """
        proxy = f"{THORDATA_BASE}&country={region}"
        session = requests.Session()
        session.proxies = {"http": proxy, "https": proxy}
        
        # Rotate user agents matching regional norms
        regional_ua = {
            "us": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)",
            "jp": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7)",
            "gb": "Mozilla/5.0 (X11; Linux x86_64)"
        }
        session.headers.update({
            "User-Agent": regional_ua.get(region, regional_ua["us"]),
            "Accept-Language": f"{region},en;q=0.9"
        })
        

        candidates = [ ]

        page = 0
        
        while len(candidates) < max_results:
            # Google Video search via SERP API
            response = session.get(
                "https://serpapi.com/search",
                params={
                    "engine": "google",
                    "q": f"{scenario} tutorial video site:youtube.com",
                    "tbm": "vid",
                    "num": 100,
                    "start": page * 100,
                    "gl": region,
                    "api_key": "your_key"
                },
                timeout=30
            )
            

            results = response.json().get("video_results", [ ])

            if not results:
                break
                
            for item in results:
                candidates.append({
                    "url": item["link"],
                    "title": item["title"],
                    "duration": item.get("duration", "0:00"),
                    "thumbnail": item.get("thumbnail", ""),
                    "region": region,
                    "scenario": scenario,
                    "discovered_at": json.dumps({})
                })
            
            page += 1
            
            # Brief pause between pages (human-like)
            import time
            time.sleep(2)
        
        return candidates[:max_results]
    
    def _extract_id(self, url):
        """Extract YouTube video ID."""
        if "v=" in url:
            return url.split("v=")[1].split("&")[0]
        if "youtu.be/" in url:
            return url.split("youtu.be/")[1]
        return url

Stage 2: Validation

Validation filters candidates by quality criteria relevant to training value. Duration, resolution indicators, content signals, and deduplication against existing corpus.

class ValidationEngine:
    def __init__(self, min_duration=30, max_duration=600, 
                 min_resolution_indicator=480):
        self.min_duration = min_duration
        self.max_duration = max_duration
        self.min_resolution = min_resolution_indicator
        
    def validate_batch(self, candidates):
        """
        Filter candidates by training value criteria.
        """

        valid = [ ]

        
        for candidate in candidates:
            score = 0

            reasons = [ ]

            
            # Duration check
            seconds = self._parse_duration(candidate["duration"])
            if self.min_duration <= seconds <= self.max_duration:
                score += 25
                reasons.append("optimal_duration")
            else:
                continue  # Hard filter
            
            # Resolution estimation from thumbnail
            resolution_score = self._estimate_resolution(
                candidate.get("thumbnail", "")
            )
            if resolution_score >= self.min_resolution:
                score += 25
                reasons.append("adequate_resolution")
            else:
                score += 10
                reasons.append("low_resolution")
            
            # Content signal from title
            content_score = self._content_signals(candidate["title"])
            score += content_score
            
            # Geographic diversity bonus
            if candidate["region"] not in self._common_regions():
                score += 15
                reasons.append("diverse_region")
            
            candidate["quality_score"] = score
            candidate["validation_reasons"] = reasons
            valid.append(candidate)
        
        # Sort by quality score
        valid.sort(key=lambda x: x["quality_score"], reverse=True)
        return valid
    
    def _parse_duration(self, duration_str):
        parts = duration_str.split(":")
        if len(parts) == 2:
            return int(parts[0]) * 60 + int(parts[1])
        elif len(parts) == 3:
            return int(parts[0]) * 3600 + int(parts[1]) * 60 + int(parts[2])
        return 0
    
    def _estimate_resolution(self, thumbnail_url):
        """
        Estimate video resolution from thumbnail dimensions.
        Higher resolution thumbnails suggest higher quality source.
        """
        try:
            # Quick HEAD request for thumbnail
            proxy = f"{THORDATA_BASE}&rotation=per_request"
            resp = requests.head(thumbnail_url, proxies={"http": proxy}, timeout=5)
            
            # Check URL patterns indicating resolution
            if "maxresdefault" in thumbnail_url:
                return 1080
            elif "sddefault" in thumbnail_url:
                return 480
            elif "hqdefault" in thumbnail_url:
                return 360
            return 240
        except:
            return 240
    
    def _content_signals(self, title):
        """Score content type by training value."""
        title_lower = title.lower()
        score = 0
        
        # Action-oriented content (high training value)
        action_words = ["tutorial", "how to", "driving", "cooking", 
                       "playing", "building", "making", "walking"]
        for word in action_words:
            if word in title_lower:
                score += 10
        
        # First-person indicators (valuable for egocentric models)
        if any(x in title_lower for x in ["pov", "gopro", "first person", "my"]):
            score += 15
        
        # Static/low-motion indicators (lower training value)
        static_words = ["slideshow", "review", "unboxing", "reaction"]
        for word in static_words:
            if word in title_lower:
                score -= 10
        
        return max(0, score)
    
    def _common_regions(self):
        """Regions likely overrepresented in corpus."""
        return ["us", "gb"]

Stage 3: Download

Download uses sticky sessions for file completion stability. Each video gets a dedicated session key maintaining consistent IP throughout the transfer.

import yt_dlp
import os

class DownloadEngine:
    def __init__(self, output_base="./downloads", quality="720"):
        self.output_base = output_base
        self.quality = quality
        os.makedirs(output_base, exist_ok=True)
        
    def download_validated(self, validated_videos, max_concurrent=10):
        """
        Concurrent download with session-per-video stability.
        """
        from concurrent.futures import ThreadPoolExecutor
        
        with ThreadPoolExecutor(max_workers=max_concurrent) as executor:
            futures = {
                executor.submit(self._download_single, video): video 
                for video in validated_videos
            }
            

            results = [ ]

            for future in futures:
                video = futures[future]
                try:
                    result = future.result()
                    results.append(("success", result))
                except Exception as e:
                    results.append(("failed", {
                        "video": video,
                        "error": str(e)
                    }))
            
            return results
    
    def _download_single(self, video):
        """
        Download with sticky session for connection stability.
        """
        video_id = self._extract_id(video["url"])
        scenario = video["scenario"]
        region = video["region"]
        
        # Create organized output path
        out_dir = os.path.join(self.output_base, scenario, region)
        os.makedirs(out_dir, exist_ok=True)
        
        # Sticky session key for this download
        session_key = f"dl_{scenario}_{region}_{video_id[:8]}"
        sticky_proxy = f"{THORDATA_BASE}&session={session_key}"
        
        ydl_opts = {
            'format': f'best[height<={self.quality}]',
            'proxy': sticky_proxy,
            'outtmpl': os.path.join(out_dir, '%(id)s_%(title)s.%(ext)s'),
            
            # Metadata for training alignment
            'writethumbnail': True,
            'writeinfojson': True,
            'writesubtitles': True,
            'writeautomaticsub': True,
            
            # Reliability settings
            'retries': 5,
            'fragment_retries': 5,
            'skip_unavailable_fragments': True,
            
            # Post-processing
            'postprocessors': [
                {'key': 'FFmpegVideoConvertor', 'preferedformat': 'mp4'}
            ],
            
            'quiet': True
        }
        
        with yt_dlp.YoutubeDL(ydl_opts) as ydl:
            info = ydl.extract_info(video["url"], download=True)
            
            return {
                "video_id": video_id,
                "file_path": ydl.prepare_filename(info),
                "duration": info.get("duration"),
                "resolution": info.get("resolution"),
                "fps": info.get("fps"),
                "region": region,
                "scenario": scenario
            }
    
    def _extract_id(self, url):
        if "v=" in url:
            return url.split("v=")[1].split("&")[0]
        if "youtu.be/" in url:
            return url.split("youtu.be/")[1]
        return url

Pipeline Orchestration

class CollectionPipeline:
    def __init__(self):
        self.discovery = DiscoveryEngine(
            scenarios=["cooking", "driving", "construction", "sports"],
            regions=["us", "gb", "jp", "in", "br", "de", "fr", "ng", "id", "mx"]
        )
        self.validation = ValidationEngine()
        self.download = DownloadEngine()
        
    def run_collection_round(self, target_total=50000):
        """
        Execute full pipeline: discover, validate, download.
        """
        print("Stage 1: Discovery")
        candidates = self.discovery.discover_all(max_per_scenario=10000)
        print(f"Discovered {len(candidates)} candidates")
        
        print("Stage 2: Validation")
        validated = self.validation.validate_batch(candidates)
        print(f"Validated {len(validated)} candidates")
        
        # Select top candidates to reach target
        selected = validated[:target_total]
        
        print("Stage 3: Download")
        results = self.download.download_validated(selected)
        
        success = sum(1 for r in results if r[0] == "success")
        failed = sum(1 for r in results if r[0] == "failed")
        
        print(f"Downloaded: {success}, Failed: {failed}")
        print(f"Success rate: {success/len(results)*100:.1f}%")
        
        return results

Performance Characteristics

Running this pipeline with ThorData residential proxies on a 16-core server:

ConfigurationDaily VolumeBlock RateSuccess RateAvg Speed
Discovery (50 workers)50,000 URLs0.1%99.9%2.1s/query
Validation (20 workers)20,000 checks0.3%99.7%1.8s/check
Download (10 concurrent)8,000 files0.4%99.6%45s/file

The total sustainable throughput is approximately 8,000 completed downloads daily per server instance. Scaling horizontally with multiple instances achieves 50,000+ daily downloads with appropriate infrastructure.

Configure your discovery pipeline with ThorData’s rotation options. Set up sticky sessions for reliable downloads. Explore geographic targeting for diverse coverage.

This pipeline runs. It scales. It doesn’t fight platforms. It works with them.