Performance Optimization Guide¶
Learn how to maximize SpotifyScraper's performance for your use case.
Table of Contents¶
- Performance Basics
- Client Optimization
- Browser Selection
- Caching Strategies
- Batch Processing
- Concurrent Operations
- Memory Management
- Network Optimization
- Profiling & Benchmarking
Performance Basics¶
Understanding Performance Factors¶
- Network Latency: Time to connect to Spotify
- Parsing Time: Time to extract data from HTML/JSON
- Browser Overhead: Selenium vs Requests
- Rate Limiting: Spotify's request limits
- Memory Usage: Data storage and processing
Quick Wins¶
# ✅ Good: Reuse client
client = SpotifyClient()
for url in urls:
data = client.get_track_info(url)
# ❌ Bad: Create new client each time
for url in urls:
client = SpotifyClient() # Overhead!
data = client.get_track_info(url)
Client Optimization¶
Client Reuse¶
Always reuse SpotifyClient instances:
class MusicAnalyzer:
def __init__(self):
# Create once, use many times
self.client = SpotifyClient()
def analyze_tracks(self, urls):
results = []
for url in urls:
data = self.client.get_track_info(url)
results.append(self.process(data))
return results
Connection Pooling¶
SpotifyClient automatically uses connection pooling with requests:
# Connection pooling is automatic with requests backend
client = SpotifyClient(browser_type="requests")
# Connections are reused for better performance
for i in range(100):
client.get_track_info(urls[i]) # Reuses connections
Browser Selection¶
Performance Comparison¶
| Browser | Speed | Memory | Use Case |
|---|---|---|---|
| requests | Fast (100ms) | Low (50MB) | Most scenarios |
| selenium | Slow (2-5s) | High (500MB+) | Complex pages |
When to Use Each¶
# Use requests (default) for speed
client = SpotifyClient(browser_type="requests")
# Use selenium only when necessary
# (e.g., JavaScript-rendered content)
client = SpotifyClient(browser_type="selenium")
Optimizing Selenium¶
If you must use Selenium:
from spotify_scraper import SpotifyClient
# Selenium runs headless by default for better performance
client = SpotifyClient(browser_type="selenium")
# Always close when done to free memory
try:
# Your operations
pass
finally:
client.close()
Caching Strategies¶
In-Memory Caching¶
from functools import lru_cache
import hashlib
class CachedSpotifyClient:
def __init__(self):
self.client = SpotifyClient()
@lru_cache(maxsize=1000)
def get_track_cached(self, url):
"""Cache track data in memory."""
return self.client.get_track_info(url)
def get_track_info(self, url):
# Normalize URL for better cache hits
url = url.split('?')[0] # Remove query params
return self.get_track_cached(url)
Disk Caching¶
import json
import os
from datetime import datetime, timedelta
import hashlib
class DiskCachedClient:
def __init__(self, cache_dir="cache", ttl_hours=24):
self.client = SpotifyClient()
self.cache_dir = cache_dir
self.ttl = timedelta(hours=ttl_hours)
os.makedirs(cache_dir, exist_ok=True)
def _get_cache_path(self, url):
"""Generate cache file path."""
url_hash = hashlib.md5(url.encode()).hexdigest()
return os.path.join(self.cache_dir, f"{url_hash}.json")
def _is_cache_valid(self, cache_path):
"""Check if cache file is still valid."""
if not os.path.exists(cache_path):
return False
mtime = datetime.fromtimestamp(os.path.getmtime(cache_path))
return datetime.now() - mtime < self.ttl
def get_track_info(self, url):
"""Get track info with disk caching."""
cache_path = self._get_cache_path(url)
# Try cache first
if self._is_cache_valid(cache_path):
with open(cache_path, 'r') as f:
return json.load(f)
# Fetch and cache
data = self.client.get_track_info(url)
with open(cache_path, 'w') as f:
json.dump(data, f)
return data
Redis Caching¶
import redis
import json
class RedisCachedClient:
def __init__(self, redis_url="redis://localhost:6379", ttl=3600):
self.client = SpotifyClient()
self.redis = redis.from_url(redis_url)
self.ttl = ttl
def get_track_info(self, url):
"""Get track info with Redis caching."""
# Try cache
cached = self.redis.get(f"track:{url}")
if cached:
return json.loads(cached)
# Fetch and cache
data = self.client.get_track_info(url)
self.redis.setex(
f"track:{url}",
self.ttl,
json.dumps(data)
)
return data
Batch Processing¶
Efficient Batch Extraction¶
import time
from typing import List, Dict, Any
def batch_extract(urls: List[str], delay: float = 0.5) -> List[Dict[str, Any]]:
"""Extract data from multiple URLs efficiently."""
client = SpotifyClient()
results = []
total = len(urls)
print(f"Processing {total} URLs...")
for i, url in enumerate(urls):
try:
# Show progress
if i % 10 == 0:
print(f"Progress: {i}/{total} ({i/total*100:.1f}%)")
# Extract data
data = client.get_track_info(url)
results.append({
'url': url,
'data': data,
'success': True
})
except Exception as e:
results.append({
'url': url,
'error': str(e),
'success': False
})
# Rate limiting
if i < total - 1:
time.sleep(delay)
# Summary
successful = sum(1 for r in results if r['success'])
print(f"\nCompleted: {successful}/{total} successful")
return results
Chunked Processing¶
def process_in_chunks(urls: List[str], chunk_size: int = 50):
"""Process URLs in chunks to manage memory."""
client = SpotifyClient()
all_results = []
for i in range(0, len(urls), chunk_size):
chunk = urls[i:i + chunk_size]
print(f"\nProcessing chunk {i//chunk_size + 1}...")
results = batch_extract(chunk)
all_results.extend(results)
# Optional: Save intermediate results
save_checkpoint(all_results, f"checkpoint_{i}.json")
# Rest between chunks
if i + chunk_size < len(urls):
print("Resting between chunks...")
time.sleep(5)
return all_results
Concurrent Operations¶
Thread Pool Executor¶
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
def concurrent_extract(urls: List[str], max_workers: int = 5):
"""Extract data concurrently with thread pool."""
client = SpotifyClient()
def extract_single(url):
"""Extract single URL with rate limiting."""
try:
data = client.get_track_info(url)
return {'url': url, 'data': data, 'success': True}
except Exception as e:
return {'url': url, 'error': str(e), 'success': False}
results = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit all tasks
future_to_url = {
executor.submit(extract_single, url): url
for url in urls
}
# Process completed tasks
for future in as_completed(future_to_url):
result = future.result()
results.append(result)
if result['success']:
print(f"✓ {result['url']}")
else:
print(f"✗ {result['url']}: {result['error']}")
return results
Async Processing (Advanced)¶
import asyncio
import aiohttp
from typing import List, Dict
class AsyncSpotifyClient:
def __init__(self):
self.semaphore = asyncio.Semaphore(5) # Limit concurrent requests
async def fetch_track(self, session, url):
"""Fetch single track asynchronously."""
async with self.semaphore:
# Convert to embed URL
track_id = url.split('/')[-1].split('?')[0]
embed_url = f"https://open.spotify.com/embed/track/{track_id}"
try:
async with session.get(embed_url) as response:
html = await response.text()
# Parse data (simplified)
return {'url': url, 'html_size': len(html), 'success': True}
except Exception as e:
return {'url': url, 'error': str(e), 'success': False}
async def batch_fetch(self, urls: List[str]):
"""Fetch multiple tracks asynchronously."""
async with aiohttp.ClientSession() as session:
tasks = [self.fetch_track(session, url) for url in urls]
return await asyncio.gather(*tasks)
# Usage
async def main():
client = AsyncSpotifyClient()
urls = ["https://open.spotify.com/track/..." for _ in range(10)]
results = await client.batch_fetch(urls)
print(f"Fetched {len(results)} tracks")
# Run
asyncio.run(main())
Memory Management¶
Memory-Efficient Processing¶
def process_large_dataset(urls: List[str], output_file: str):
"""Process large dataset without loading all in memory."""
client = SpotifyClient()
with open(output_file, 'w') as f:
f.write('[\n') # Start JSON array
for i, url in enumerate(urls):
try:
data = client.get_track_info(url)
# Write immediately, don't store
if i > 0:
f.write(',\n')
json.dump(data, f)
# Clear any internal caches periodically
if i % 100 == 0:
import gc
gc.collect()
except Exception as e:
print(f"Error: {url} - {e}")
time.sleep(0.5) # Rate limit
f.write('\n]') # End JSON array
Generator Pattern¶
def track_generator(urls: List[str]):
"""Generate track data one at a time."""
client = SpotifyClient()
for url in urls:
try:
yield client.get_track_info(url)
time.sleep(0.5) # Rate limit
except Exception as e:
print(f"Error: {url} - {e}")
yield None
# Memory efficient usage
for track_data in track_generator(urls):
if track_data:
# Process immediately, don't store
process_track(track_data)
Network Optimization¶
Retry Logic¶
import time
from typing import Optional
def extract_with_retry(url: str, max_retries: int = 3) -> Optional[Dict]:
"""Extract with exponential backoff retry."""
client = SpotifyClient()
for attempt in range(max_retries):
try:
return client.get_track_info(url)
except NetworkError as e:
if attempt < max_retries - 1:
wait_time = 2 ** attempt # Exponential backoff
print(f"Retry {attempt + 1} after {wait_time}s...")
time.sleep(wait_time)
else:
raise
except Exception as e:
print(f"Unrecoverable error: {e}")
return None
Request Optimization¶
# Use appropriate timeouts
client = SpotifyClient()
# For requests backend, timeouts are built-in
# For selenium, ensure proper cleanup
Profiling & Benchmarking¶
Simple Benchmarking¶
import time
from statistics import mean, stdev
def benchmark_extraction(urls: List[str], iterations: int = 3):
"""Benchmark extraction performance."""
client = SpotifyClient()
times = []
for i in range(iterations):
start = time.time()
for url in urls:
try:
client.get_track_info(url)
except:
pass
elapsed = time.time() - start
times.append(elapsed)
print(f"Iteration {i+1}: {elapsed:.2f}s")
print(f"\nResults:")
print(f"Average: {mean(times):.2f}s")
print(f"Std Dev: {stdev(times):.2f}s")
print(f"Per URL: {mean(times)/len(urls):.3f}s")
Memory Profiling¶
import tracemalloc
def profile_memory_usage():
"""Profile memory usage."""
tracemalloc.start()
# Your operations
client = SpotifyClient()
data = []
for i in range(100):
data.append(client.get_track_info(f"...track_{i}"))
current, peak = tracemalloc.get_traced_memory()
print(f"Current memory: {current / 1024 / 1024:.1f} MB")
print(f"Peak memory: {peak / 1024 / 1024:.1f} MB")
tracemalloc.stop()
Performance Best Practices¶
Do's ✅¶
- Reuse client instances
- Use requests backend when possible
- Implement caching for repeated requests
- Add appropriate delays between requests
- Process data in chunks for large datasets
- Close Selenium browsers when done
- Handle errors gracefully
Don'ts ❌¶
- Don't create new clients repeatedly
- Don't use Selenium unless necessary
- Don't make parallel requests without limits
- Don't ignore rate limits
- Don't load entire datasets into memory
- Don't forget to close resources
Performance Checklist¶
Before deploying to production:
- [ ] Client instances are reused
- [ ] Appropriate browser backend selected
- [ ] Caching implemented (if applicable)
- [ ] Rate limiting in place
- [ ] Error handling and retries
- [ ] Memory usage monitored
- [ ] Resources properly closed
- [ ] Benchmarked performance
- [ ] Logging configured appropriately
Real-World Example¶
Here's a complete, optimized data extraction system:
import time
import json
from datetime import datetime
from typing import List, Dict, Any
from concurrent.futures import ThreadPoolExecutor
from functools import lru_cache
class OptimizedSpotifyExtractor:
def __init__(self, cache_size=1000, max_workers=3):
self.client = SpotifyClient()
self.max_workers = max_workers
self.stats = {
'requests': 0,
'cache_hits': 0,
'errors': 0,
'start_time': datetime.now()
}
@lru_cache(maxsize=1000)
def _extract_cached(self, url: str) -> Dict[str, Any]:
"""Extract with caching."""
self.stats['requests'] += 1
return self.client.get_track_info(url)
def extract_track(self, url: str) -> Dict[str, Any]:
"""Extract single track with caching."""
# Normalize URL
url = url.split('?')[0]
# Check if cached
if url in self._extract_cached.cache_info()._cache:
self.stats['cache_hits'] += 1
try:
return self._extract_cached(url)
except Exception as e:
self.stats['errors'] += 1
raise
def extract_batch(self, urls: List[str]) -> List[Dict[str, Any]]:
"""Extract batch with rate limiting."""
results = []
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = []
for i, url in enumerate(urls):
# Submit with delay
if i > 0:
time.sleep(0.5) # Rate limit
future = executor.submit(self.extract_track, url)
futures.append((url, future))
# Collect results
for url, future in futures:
try:
result = future.result(timeout=30)
results.append(result)
except Exception as e:
print(f"Failed: {url} - {e}")
results.append(None)
return results
def get_stats(self) -> Dict[str, Any]:
"""Get performance statistics."""
elapsed = (datetime.now() - self.stats['start_time']).total_seconds()
return {
'total_requests': self.stats['requests'],
'cache_hits': self.stats['cache_hits'],
'cache_hit_rate': self.stats['cache_hits'] / max(1, self.stats['requests']),
'errors': self.stats['errors'],
'elapsed_seconds': elapsed,
'requests_per_second': self.stats['requests'] / max(1, elapsed)
}
# Usage
extractor = OptimizedSpotifyExtractor()
# Extract batch
urls = ["https://open.spotify.com/track/..." for _ in range(100)]
results = extractor.extract_batch(urls)
# Show stats
stats = extractor.get_stats()
print(f"Performance: {stats['requests_per_second']:.2f} req/s")
print(f"Cache hit rate: {stats['cache_hit_rate']:.1%}")
This optimized system includes: - Client reuse - In-memory caching - Concurrent processing with limits - Rate limiting - Performance statistics - Error handling
Expected performance: 1-2 requests/second with 0.5s delay, higher with caching.