Async¶
AsyncSpotifyClient is a one-to-one mirror of SpotifyClient over an async
transport. Every fetch and download method has the same name and signature —
you just await it and manage the client with async with.
The async client¶
import asyncio
from spotify_scraper import AsyncSpotifyClient
async def main() -> None:
async with AsyncSpotifyClient() as client:
track = await client.get_track("4uLU6hMCjMI75M1A2tKUQC")
print(track.name)
asyncio.run(main())
The same methods are available as on the sync client:
await client.get_track(value)
await client.get_album(value)
await client.get_artist(value)
await client.get_playlist(value, max_tracks=100)
await client.get_episode(value)
await client.get_show(value, max_episodes=50)
await client.download_cover(entity, dest, size="largest", filename=None)
await client.download_preview(entity, dest, filename=None, embed_cover=False)
Closing the client¶
async with closes the client for you. To manage it manually, await aclose:
client = AsyncSpotifyClient()
try:
track = await client.get_track("4uLU6hMCjMI75M1A2tKUQC")
finally:
await client.aclose()
Using the client after it is closed raises SpotifyScraperError.
Fetch many entities concurrently¶
The reason to go async is concurrency. asyncio.gather runs many fetches at
once over a single client and connection pool:
import asyncio
from spotify_scraper import AsyncSpotifyClient
TRACK_IDS = [
"4uLU6hMCjMI75M1A2tKUQC",
"7ouMYWpwJ422jRcDASZB7P",
"0VjIjW4GlUZAMYd2vXMi3b",
]
async def main() -> None:
async with AsyncSpotifyClient() as client:
tracks = await asyncio.gather(
*(client.get_track(track_id) for track_id in TRACK_IDS)
)
for track in tracks:
print(track.name, "—", track.artists[0].name)
asyncio.run(main())
You can mix entity types in one batch:
async with AsyncSpotifyClient() as client:
track, album, artist = await asyncio.gather(
client.get_track("4uLU6hMCjMI75M1A2tKUQC"),
client.get_album("4aawyAB9vmqN3uQ7FjRGTy"),
client.get_artist("0gxyHStUsqpMadRV0Di1Qt"),
)
Handle partial failures¶
By default asyncio.gather cancels the whole batch on the first exception. Pass
return_exceptions=True to collect successes and failures side by side:
import asyncio
from spotify_scraper import AsyncSpotifyClient, NotFoundError
async def main() -> None:
ids = ["4uLU6hMCjMI75M1A2tKUQC", "0000000000000000000000"]
async with AsyncSpotifyClient() as client:
results = await asyncio.gather(
*(client.get_track(i) for i in ids),
return_exceptions=True,
)
for entity_id, result in zip(ids, results):
if isinstance(result, NotFoundError):
print(entity_id, "-> not found")
elif isinstance(result, Exception):
print(entity_id, "-> error:", result)
else:
print(entity_id, "->", result.name)
asyncio.run(main())
See Error handling for the full exception tree.
Rate limiting still applies
Concurrency does not bypass the throttle. The same token-bucket rate
limiter that protects the sync client governs every async request, per
host, across the whole event loop. Firing 500 gather tasks does not send
500 simultaneous requests — they drain at the configured rate. This is what
keeps you from getting blocked; see Anti-ban & resilience to
tune RateLimit, RetryPolicy, and proxies. The async client accepts every
one of those knobs:
from spotify_scraper import AsyncSpotifyClient, RateLimit, RetryPolicy
client = AsyncSpotifyClient(
rate_limit=RateLimit(per_second=2.0, burst=5),
retry=RetryPolicy(max_attempts=4),
proxy="http://user:pass@proxy.example:8080",
)
Bounding concurrency yourself¶
For very large batches, cap in-flight work with a semaphore so you do not build an enormous task list at once:
import asyncio
from spotify_scraper import AsyncSpotifyClient
async def fetch_all(ids: list[str]) -> list:
semaphore = asyncio.Semaphore(10)
async with AsyncSpotifyClient() as client:
async def one(track_id: str):
async with semaphore:
return await client.get_track(track_id)
return await asyncio.gather(*(one(i) for i in ids))