Skip to main content

Overview

Caching is a technique used to temporarily store copies of data or computation results to improve performance by reducing the need to repeatedly fetch or compute the same data from slower or more resource-intensive sources. In the context of AI applications, caching provides several important benefits:
  • 🚀 Performance improvement - Avoid repeating expensive operations like API calls or complex calculations
  • 💰 Cost reduction - Minimize repeated calls to paid services (like external APIs or LLM providers)
  • Latency reduction - Deliver faster responses to users by serving cached results
  • 🔄 Consistency - Ensure consistent responses for identical inputs
BeeAI framework provides a robust caching system with multiple implementations to suit different use cases.

Core concepts

Cache types

BeeAI framework offers several cache implementations out of the box:
TypeDescription
UnconstrainedCacheSimple in-memory cache with no limits
SlidingCacheIn-memory cache that maintains a maximum number of entries
FileCachePersistent cache that stores data on disk
NullCacheSpecial implementation that performs no caching (useful for testing)
Each cache type implements the BaseCache interface, making them interchangeable in your code.

Usage patterns

BeeAI framework supports several caching patterns:
Usage patternDescription
Direct cachingManually store and retrieve values
Function decorationAutomatically cache function returns
Tool integrationCache tool execution results
LLM integrationCache model responses

Basic usage

Caching function output

The simplest way to use caching is to wrap a function that produces deterministic output:
import asyncio
import sys
import traceback

from beeai_framework.cache import UnconstrainedCache
from beeai_framework.errors import FrameworkError


async def main() -> None:
    cache: UnconstrainedCache[int] = UnconstrainedCache()

    async def fibonacci(n: int) -> int:
        cache_key = str(n)
        cached = await cache.get(cache_key)
        if cached:
            return int(cached)

        if n < 1:
            result = 0
        elif n <= 2:
            result = 1
        else:
            result = await fibonacci(n - 1) + await fibonacci(n - 2)

        await cache.set(cache_key, result)
        return result

    print(await fibonacci(10))  # 55
    print(await fibonacci(9))  # 34 (retrieved from cache)
    print(f"Cache size {await cache.size()}")  # 10


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except FrameworkError as e:
        traceback.print_exc()
        sys.exit(e.explain())

Using with tools

BeeAI framework’s caching system seamlessly integrates with tools:
import asyncio
import sys
import traceback

from beeai_framework.cache import SlidingCache
from beeai_framework.errors import FrameworkError
from beeai_framework.tools.search.wikipedia import (
    WikipediaTool,
    WikipediaToolInput,
)


async def main() -> None:
    wikipedia_client = WikipediaTool({"full_text": True, "cache": SlidingCache(size=100, ttl=5 * 60)})

    print(await wikipedia_client.cache.size())  # 0
    tool_input = WikipediaToolInput(query="United States")
    first = await wikipedia_client.run(tool_input)
    print(await wikipedia_client.cache.size())  # 1

    # new request with the EXACTLY same input will be retrieved from the cache
    tool_input = WikipediaToolInput(query="United States")
    second = await wikipedia_client.run(tool_input)
    print(first.get_text_content() == second.get_text_content())  # True
    print(await wikipedia_client.cache.size())  # 1


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except FrameworkError as e:
        traceback.print_exc()
        sys.exit(e.explain())

Using with LLMs

You can also cache LLM responses to save on API costs:
import asyncio
import sys
import traceback

from beeai_framework.adapters.ollama import OllamaChatModel
from beeai_framework.backend import ChatModelParameters, UserMessage
from beeai_framework.cache import SlidingCache
from beeai_framework.errors import FrameworkError


async def main() -> None:
    llm = OllamaChatModel("granite3.3")
    llm.config(parameters=ChatModelParameters(max_tokens=25), cache=SlidingCache(size=50))

    print(await llm.cache.size())  # 0
    first = await llm.run([UserMessage("Who is Amilcar Cabral?")])
    print(await llm.cache.size())  # 1

    # new request with the EXACTLY same input will be retrieved from the cache
    second = await llm.run([UserMessage("Who is Amilcar Cabral?")])
    print(first.get_text_content() == second.get_text_content())  # True
    print(await llm.cache.size())  # 1


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except FrameworkError as e:
        traceback.print_exc()
        sys.exit(e.explain())


Cache types

UnconstrainedCache

The simplest cache type with no constraints on size or entry lifetime. Good for development and smaller applications.
import asyncio
import sys
import traceback

from beeai_framework.cache import UnconstrainedCache
from beeai_framework.errors import FrameworkError


async def main() -> None:
    cache: UnconstrainedCache[int] = UnconstrainedCache()

    # Save
    await cache.set("a", 1)
    await cache.set("b", 2)

    # Read
    result = await cache.has("a")
    print(result)  # True

    # Meta
    print(cache.enabled)  # True
    print(await cache.has("a"))  # True
    print(await cache.has("b"))  # True
    print(await cache.has("c"))  # False
    print(await cache.size())  # 2

    # Delete
    await cache.delete("a")
    print(await cache.has("a"))  # False

    # Clear
    await cache.clear()
    print(await cache.size())  # 0


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except FrameworkError as e:
        traceback.print_exc()
        sys.exit(e.explain())

SlidingCache

Maintains a maximum number of entries, removing the oldest entries when the limit is reached.
import asyncio
import sys
import traceback

from beeai_framework.cache import SlidingCache
from beeai_framework.errors import FrameworkError


async def main() -> None:
    cache: SlidingCache[int] = SlidingCache(
        size=3,  # (required) number of items that can be live in the cache at a single moment
        ttl=1,  # // (optional, default is Infinity) Time in seconds after the element is removed from a cache
    )

    await cache.set("a", 1)
    await cache.set("b", 2)
    await cache.set("c", 3)

    await cache.set("d", 4)  # overflow - cache internally removes the oldest entry (key "a")

    print(await cache.has("a"))  # False
    print(await cache.size())  # 3


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except FrameworkError as e:
        traceback.print_exc()
        sys.exit(e.explain())

FileCache

Persists cache data to disk, allowing data to survive if application restarts. Use it when caches must survive process restarts or you need to share state between workers. Persisted entries still respect TTL and eviction settings, so design your limits accordingly.
import asyncio
import json
import sys
import tempfile
import time
import traceback
from collections import OrderedDict
from collections.abc import Mapping
from pathlib import Path
from typing import Generic, TypeVar

from beeai_framework.cache import BaseCache
from beeai_framework.errors import FrameworkError

T = TypeVar("T")


class JsonFileCache(BaseCache[T], Generic[T]):
    """Simple file-backed cache with optional LRU eviction and TTL support."""

    def __init__(self, path: Path, *, size: int = 128, ttl: float | None = None) -> None:
        super().__init__()
        self._path = path
        self._size = size
        self._ttl = ttl
        self._items: OrderedDict[str, tuple[T, float | None]] = OrderedDict()
        self._load_from_disk()

    @property
    def source(self) -> Path:
        return self._path

    @classmethod
    async def from_mapping(
        cls,
        path: Path,
        items: Mapping[str, T],
        *,
        size: int = 128,
        ttl: float | None = None,
    ) -> "JsonFileCache[T]":
        cache = cls(path, size=size, ttl=ttl)
        for key, value in items.items():
            await cache.set(key, value)
        return cache

    async def size(self) -> int:
        await self._purge_expired()
        return len(self._items)

    async def set(self, key: str, value: T) -> None:
        await self._purge_expired()
        expires_at = time.time() + self._ttl if self._ttl is not None else None
        if key in self._items:
            self._items.pop(key)
        self._items[key] = (value, expires_at)
        await self._enforce_capacity()
        self._dump_to_disk()

    async def get(self, key: str) -> T | None:
        await self._purge_expired()
        if key not in self._items:
            return None

        value, expires_at = self._items.pop(key)
        self._items[key] = (value, expires_at)
        return value

    async def has(self, key: str) -> bool:
        await self._purge_expired()
        return key in self._items

    async def delete(self, key: str) -> bool:
        await self._purge_expired()
        if key not in self._items:
            return False

        self._items.pop(key)
        self._dump_to_disk()
        return True

    async def clear(self) -> None:
        self._items.clear()
        if self._path.exists():
            self._path.unlink()

    async def reload(self) -> None:
        self._items.clear()
        self._load_from_disk()
        await self._purge_expired()

    async def _purge_expired(self) -> None:
        now = time.time()
        expired_keys = [
            key for key, (_, expires_at) in list(self._items.items()) if expires_at is not None and expires_at <= now
        ]
        for key in expired_keys:
            self._items.pop(key, None)
        if expired_keys:
            self._dump_to_disk()

    async def _enforce_capacity(self) -> None:
        while len(self._items) > self._size:
            oldest_key, _ = self._items.popitem(last=False)

    def _load_from_disk(self) -> None:
        if not self._path.exists():
            return

        try:
            raw = json.loads(self._path.read_text())
        except json.JSONDecodeError:
            return

        now = time.time()
        for key, payload in raw.items():
            expires_at = payload.get("expires_at")
            if expires_at is not None and expires_at <= now:
                continue
            self._items[key] = (payload["value"], expires_at)

    def _dump_to_disk(self) -> None:
        self._path.parent.mkdir(parents=True, exist_ok=True)
        data = {key: {"value": value, "expires_at": expires_at} for key, (value, expires_at) in self._items.items()}
        self._path.write_text(json.dumps(data, indent=2))


async def main() -> None:
    with tempfile.TemporaryDirectory() as tmpdir:
        path = Path(tmpdir) / "bee_cache.json"
        cache: JsonFileCache[dict[str, str]] = JsonFileCache(path, size=2, ttl=1.5)

        await cache.set("profile", {"name": "Bee", "role": "assistant"})
        await cache.set("settings", {"theme": "dark"})
        print(f"Cache persisted to {cache.source}")

        await cache.set("session", {"token": "abc123"})
        print(await cache.has("profile"))  # False -> evicted when capacity exceeded

        reloaded: JsonFileCache[dict[str, str]] = JsonFileCache(path, size=2, ttl=1.5)
        print(await reloaded.get("settings"))  # {'theme': 'dark'}

        await asyncio.sleep(1.6)
        await reloaded.reload()
        print(await reloaded.get("session"))  # None -> TTL expired


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except FrameworkError as e:
        traceback.print_exc()
        sys.exit(e.explain())

With custom provider

Seed a file-backed cache from another provider when you want to warm the disk cache before first use or promote hot data captured in memory. The example below clones an UnconstrainedCache into the JSON file cache so new processes can reuse it immediately.
import asyncio
import sys
import tempfile
import traceback
from pathlib import Path
from typing import TypeVar

from beeai_framework.cache import UnconstrainedCache
from beeai_framework.errors import FrameworkError
from examples.cache.file_cache import JsonFileCache

T = TypeVar("T")


async def export_cache(provider: UnconstrainedCache[T]) -> dict[str, T]:
    """Clone an in-memory cache so that we can safely persist its content."""
    cloned = await provider.clone()
    # UnconstrainedCache stores entries in a simple dict, so cloning is inexpensive here.
    return getattr(cloned, "_provider", {}).copy()


async def main() -> None:
    memory_cache: UnconstrainedCache[int] = UnconstrainedCache()
    await memory_cache.set("tasks:open", 7)
    await memory_cache.set("tasks:closed", 12)

    with tempfile.TemporaryDirectory() as tmpdir:
        path = Path(tmpdir) / "bee_cache.json"

        file_cache = await JsonFileCache.from_mapping(path, await export_cache(memory_cache), size=10, ttl=10)
        print(f"Promoted cache to disk: {file_cache.source}")

        print(await file_cache.get("tasks:open"))  # 7
        await file_cache.set("tasks:stale", 1)
        print(await file_cache.size())  # 3

        reloaded: JsonFileCache[int] = JsonFileCache(path, size=10, ttl=10)
        print(await reloaded.get("tasks:closed"))  # 12


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except FrameworkError as e:
        traceback.print_exc()
        sys.exit(e.explain())

NullCache

A special cache that implements the BaseCache interface but performs no caching. Useful for testing or temporarily disabling caching. The reason for implementing is to enable Null object pattern.

Advanced usage

Cache decorator

Create a reusable decorator when you want to keep caching logic close to your functions without wiring cache calls manually.
import asyncio
import sys
import time
import traceback

from beeai_framework.cache import SlidingCache, cached
from beeai_framework.errors import FrameworkError

request_cache: SlidingCache[str] = SlidingCache(size=8, ttl=2)


class ReportGenerator:
    def __init__(self) -> None:
        self._call_counter = 0

    @cached(request_cache)
    async def generate(self, department: str) -> str:
        self._call_counter += 1
        await asyncio.sleep(0.1)
        timestamp = time.time()
        return f"{department}:{self._call_counter}@{timestamp:.0f}"


async def main() -> None:
    generator = ReportGenerator()
    first = await generator.generate("sales")
    second = await generator.generate("sales")
    print(first == second)  # True -> cached result

    await asyncio.sleep(2.1)  # TTL expired
    third = await generator.generate("sales")
    print(first == third)  # False -> cache miss, recomputed


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except FrameworkError as e:
        traceback.print_exc()
        sys.exit(e.explain())

For more complex caching logic, you can customize the key generation: Use custom key builders to partition cache entries per tenant or time window, and clear the cache in response to deployment events.
import asyncio
import datetime as dt
import random
import sys
import traceback
from typing import Any

from beeai_framework.cache import BaseCache, SlidingCache, cached
from beeai_framework.errors import FrameworkError

activity_cache: SlidingCache[dict[str, Any]] = SlidingCache(size=16, ttl=5)


def session_cache_key(args: tuple[Any, ...], kwargs: dict[str, Any]) -> str:
    user_id = kwargs.get("user_id") or args[0]
    scope = kwargs.get("scope", "default")
    bucket: int | None = kwargs.get("minute_bucket")
    payload = {"user_id": user_id, "scope": scope}
    if bucket is not None:
        payload["minute_bucket"] = bucket
    return BaseCache.generate_key(payload)


class FeatureFlagService:
    def __init__(self, *, caching_enabled: bool = True) -> None:
        self._enabled = caching_enabled
        self._db_hits = 0

    @cached(activity_cache, enabled=True, key_fn=session_cache_key)
    async def load_flags(
        self, user_id: str, scope: str = "default", minute_bucket: int | None = None
    ) -> dict[str, Any]:
        self._db_hits += 1
        await asyncio.sleep(0.05)
        return {
            "user": user_id,
            "scope": scope,
            "db_hits": self._db_hits,
            "flags": {"beta_search": random.choice([True, False])},
            "refreshed_at": dt.datetime.now(dt.UTC).isoformat(timespec="seconds"),
        }


async def main() -> None:
    service = FeatureFlagService()
    bucket = int(dt.datetime.now(dt.UTC).timestamp() // 60)

    first = await service.load_flags("42", scope="admin", minute_bucket=bucket)
    second = await service.load_flags("42", scope="admin", minute_bucket=bucket)
    print(first == second)  # True -> same cache key within a minute bucket

    await activity_cache.clear()  # Manual invalidation when new feature set deployed
    refreshed = await service.load_flags("42", scope="admin", minute_bucket=bucket)
    print(refreshed["db_hits"])  # 2 -> cache miss due to clear

    # Changing scope hits a different cache entry without flushing existing data.
    other_scope = await service.load_flags("42", scope="viewer", minute_bucket=bucket)
    print(other_scope["scope"])  # viewer


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except FrameworkError as e:
        traceback.print_exc()
        sys.exit(e.explain())

CacheFn helper

For more dynamic caching needs, the CacheFn helper provides a functional approach: It is well-suited for API tokens or other resources that return an expiry with each refresh—call update_ttl before returning the value so the cache matches the upstream lifetime.
import asyncio
import random
import sys
import traceback
from typing import TypedDict

from beeai_framework.cache import CacheFn
from beeai_framework.errors import FrameworkError


class TokenResponse(TypedDict):
    token: str
    expires_in: float


async def main() -> None:
    async def fetch_api_token() -> str:
        response: TokenResponse = {"token": f"TOKEN-{random.randint(1000, 9999)}", "expires_in": 0.2}
        get_token.update_ttl(response["expires_in"])
        await asyncio.sleep(0.05)
        return response["token"]

    get_token = CacheFn.create(fetch_api_token, default_ttl=0.1)

    first = await get_token()
    second = await get_token()
    print(first == second)  # True -> cached value

    await asyncio.sleep(0.25)
    refreshed = await get_token()
    print(first == refreshed)  # False -> TTL elapsed, value refreshed


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except FrameworkError as e:
        traceback.print_exc()
        sys.exit(e.explain())


Creating a custom cache provider

You can create your own cache implementation by extending the BaseCache class:
from typing import TypeVar

from beeai_framework.cache import BaseCache

T = TypeVar("T")


class CustomCache(BaseCache[T]):
    async def size(self) -> int:
        raise NotImplementedError("CustomCache 'size' not yet implemented")

    async def set(self, _key: str, _value: T) -> None:
        raise NotImplementedError("CustomCache 'set' not yet implemented")

    async def get(self, key: str) -> T | None:
        raise NotImplementedError("CustomCache 'get' not yet implemented")

    async def has(self, key: str) -> bool:
        raise NotImplementedError("CustomCache 'has' not yet implemented")

    async def delete(self, key: str) -> bool:
        raise NotImplementedError("CustomCache 'delete' not yet implemented")

    async def clear(self) -> None:
        raise NotImplementedError("CustomCache 'clear' not yet implemented")


Examples