> ## Documentation Index
> Fetch the complete documentation index at: https://framework.beeai.dev/llms.txt
> Use this file to discover all available pages before exploring further.

# Cache

## Overview

Caching is a technique used to temporarily store copies of data or computation results to improve performance by reducing the need to repeatedly fetch or compute the same data from slower or more resource-intensive sources.

In the context of AI applications, caching provides several important benefits:

* 🚀 **Performance improvement** - Avoid repeating expensive operations like API calls or complex calculations
* 💰 **Cost reduction** - Minimize repeated calls to paid services (like external APIs or LLM providers)
* ⚡ **Latency reduction** - Deliver faster responses to users by serving cached results
* 🔄 **Consistency** - Ensure consistent responses for identical inputs

BeeAI framework provides a robust caching system with multiple implementations to suit different use cases.

***

## Core concepts

### Cache types

BeeAI framework offers several cache implementations out of the box:

| Type                   | Description                                                          |
| :--------------------- | :------------------------------------------------------------------- |
| **UnconstrainedCache** | Simple in-memory cache with no limits                                |
| **SlidingCache**       | In-memory cache that maintains a maximum number of entries           |
| **FileCache**          | Persistent cache that stores data on disk                            |
| **NullCache**          | Special implementation that performs no caching (useful for testing) |

Each cache type implements the `BaseCache` interface, making them interchangeable in your code.

### Usage patterns

BeeAI framework supports several caching patterns:

| Usage pattern           | Description                          |
| :---------------------- | :----------------------------------- |
| **Direct caching**      | Manually store and retrieve values   |
| **Function decoration** | Automatically cache function returns |
| **Tool integration**    | Cache tool execution results         |
| **LLM integration**     | Cache model responses                |

***

## Basic usage

### Caching function output

The simplest way to use caching is to wrap a function that produces deterministic output:

<CodeGroup>
  ```py Python [expandable] theme={null}
  import asyncio
  import sys
  import traceback

  from beeai_framework.cache import UnconstrainedCache
  from beeai_framework.errors import FrameworkError


  async def main() -> None:
      cache: UnconstrainedCache[int] = UnconstrainedCache()

      async def fibonacci(n: int) -> int:
          cache_key = str(n)
          cached = await cache.get(cache_key)
          if cached:
              return int(cached)

          if n < 1:
              result = 0
          elif n <= 2:
              result = 1
          else:
              result = await fibonacci(n - 1) + await fibonacci(n - 2)

          await cache.set(cache_key, result)
          return result

      print(await fibonacci(10))  # 55
      print(await fibonacci(9))  # 34 (retrieved from cache)
      print(f"Cache size {await cache.size()}")  # 10


  if __name__ == "__main__":
      try:
          asyncio.run(main())
      except FrameworkError as e:
          traceback.print_exc()
          sys.exit(e.explain())

  ```

  ```ts TypeScript [expandable] theme={null}
  import { UnconstrainedCache } from "beeai-framework/cache/unconstrainedCache";

  const cache = new UnconstrainedCache<number>();

  async function fibonacci(n: number): Promise<number> {
    const cacheKey = n.toString();
    const cached = await cache.get(cacheKey);
    if (cached !== undefined) {
      return cached;
    }

    const result = n < 1 ? 0 : n <= 2 ? 1 : (await fibonacci(n - 1)) + (await fibonacci(n - 2));
    await cache.set(cacheKey, result);
    return result;
  }

  console.info(await fibonacci(10)); // 55
  console.info(await fibonacci(9)); // 34 (retrieved from cache)
  console.info(`Cache size ${await cache.size()}`); // 10

  ```
</CodeGroup>

### Using with tools

BeeAI framework's caching system seamlessly integrates with tools:

<CodeGroup>
  ```py Python [expandable] theme={null}
  import asyncio
  import sys
  import traceback

  from beeai_framework.cache import SlidingCache
  from beeai_framework.errors import FrameworkError
  from beeai_framework.tools.search.wikipedia import (
      WikipediaTool,
      WikipediaToolInput,
  )


  async def main() -> None:
      wikipedia_client = WikipediaTool({"full_text": True, "cache": SlidingCache(size=100, ttl=5 * 60)})

      print(await wikipedia_client.cache.size())  # 0
      tool_input = WikipediaToolInput(query="United States")
      first = await wikipedia_client.run(tool_input)
      print(await wikipedia_client.cache.size())  # 1

      # new request with the EXACTLY same input will be retrieved from the cache
      tool_input = WikipediaToolInput(query="United States")
      second = await wikipedia_client.run(tool_input)
      print(first.get_text_content() == second.get_text_content())  # True
      print(await wikipedia_client.cache.size())  # 1


  if __name__ == "__main__":
      try:
          asyncio.run(main())
      except FrameworkError as e:
          traceback.print_exc()
          sys.exit(e.explain())

  ```

  ```ts TypeScript [expandable] theme={null}
  import { SlidingCache } from "beeai-framework/cache/slidingCache";
  import { WikipediaTool } from "beeai-framework/tools/search/wikipedia";

  const ddg = new WikipediaTool({
    cache: new SlidingCache({
      size: 100, // max 100 entries
      ttl: 5 * 60 * 1000, // 5 minutes lifespan
    }),
  });

  const response = await ddg.run({
    query: "United States",
  });
  // upcoming requests with the EXACTLY same input will be retrieved from the cache

  ```
</CodeGroup>

### Using with LLMs

You can also cache LLM responses to save on API costs:

<CodeGroup>
  ```py Python [expandable] theme={null}
  import asyncio
  import sys
  import traceback

  from beeai_framework.adapters.ollama import OllamaChatModel
  from beeai_framework.backend import ChatModelParameters, UserMessage
  from beeai_framework.cache import SlidingCache
  from beeai_framework.errors import FrameworkError


  async def main() -> None:
      llm = OllamaChatModel("granite3.3")
      llm.config(parameters=ChatModelParameters(max_tokens=25), cache=SlidingCache(size=50))

      print(await llm.cache.size())  # 0
      first = await llm.run([UserMessage("Who is Amilcar Cabral?")])
      print(await llm.cache.size())  # 1

      # new request with the EXACTLY same input will be retrieved from the cache
      second = await llm.run([UserMessage("Who is Amilcar Cabral?")])
      print(first.get_text_content() == second.get_text_content())  # True
      print(await llm.cache.size())  # 1


  if __name__ == "__main__":
      try:
          asyncio.run(main())
      except FrameworkError as e:
          traceback.print_exc()
          sys.exit(e.explain())

  ```

  ```ts TypeScript [expandable] theme={null}
  import { SlidingCache } from "beeai-framework/cache/slidingCache";
  import { OllamaChatModel } from "beeai-framework/adapters/ollama/backend/chat";
  import { UserMessage } from "beeai-framework/backend/message";

  const llm = new OllamaChatModel("granite4:micro");
  llm.config({
    cache: new SlidingCache({
      size: 50,
    }),
    parameters: {
      maxTokens: 25,
    },
  });

  console.info(await llm.cache.size()); // 0
  const first = await llm.create({
    messages: [new UserMessage("Who was Alan Turing?")],
  });
  // upcoming requests with the EXACTLY same input will be retrieved from the cache
  console.info(await llm.cache.size()); // 1
  const second = await llm.create({
    messages: [new UserMessage("Who was Alan Turing?")],
  });
  console.info(first.getTextContent() === second.getTextContent()); // true
  console.info(await llm.cache.size()); // 1

  ```
</CodeGroup>

***

## Cache types

### UnconstrainedCache

The simplest cache type with no constraints on size or entry lifetime. Good for development and smaller applications.

<CodeGroup>
  ```py Python [expandable] theme={null}
  import asyncio
  import sys
  import traceback

  from beeai_framework.cache import UnconstrainedCache
  from beeai_framework.errors import FrameworkError


  async def main() -> None:
      cache: UnconstrainedCache[int] = UnconstrainedCache()

      # Save
      await cache.set("a", 1)
      await cache.set("b", 2)

      # Read
      result = await cache.has("a")
      print(result)  # True

      # Meta
      print(cache.enabled)  # True
      print(await cache.has("a"))  # True
      print(await cache.has("b"))  # True
      print(await cache.has("c"))  # False
      print(await cache.size())  # 2

      # Delete
      await cache.delete("a")
      print(await cache.has("a"))  # False

      # Clear
      await cache.clear()
      print(await cache.size())  # 0


  if __name__ == "__main__":
      try:
          asyncio.run(main())
      except FrameworkError as e:
          traceback.print_exc()
          sys.exit(e.explain())

  ```

  ```ts TypeScript [expandable] theme={null}
  import { UnconstrainedCache } from "beeai-framework/cache/unconstrainedCache";

  const cache = new UnconstrainedCache();

  // Save
  await cache.set("a", 1);
  await cache.set("b", 2);

  // Read
  const result = await cache.get("a");
  console.log(result); // 1

  // Meta
  console.log(cache.enabled); // true
  console.log(await cache.has("a")); // true
  console.log(await cache.has("b")); // true
  console.log(await cache.has("c")); // false
  console.log(await cache.size()); // 2

  // Delete
  await cache.delete("a");
  console.log(await cache.has("a")); // false

  // Clear
  await cache.clear();
  console.log(await cache.size()); // 0

  ```
</CodeGroup>

### SlidingCache

Maintains a maximum number of entries, removing the oldest entries when the limit is reached.

<CodeGroup>
  ```py Python [expandable] theme={null}
  import asyncio
  import sys
  import traceback

  from beeai_framework.cache import SlidingCache
  from beeai_framework.errors import FrameworkError


  async def main() -> None:
      cache: SlidingCache[int] = SlidingCache(
          size=3,  # (required) number of items that can be live in the cache at a single moment
          ttl=1,  # // (optional, default is Infinity) Time in seconds after the element is removed from a cache
      )

      await cache.set("a", 1)
      await cache.set("b", 2)
      await cache.set("c", 3)

      await cache.set("d", 4)  # overflow - cache internally removes the oldest entry (key "a")

      print(await cache.has("a"))  # False
      print(await cache.size())  # 3


  if __name__ == "__main__":
      try:
          asyncio.run(main())
      except FrameworkError as e:
          traceback.print_exc()
          sys.exit(e.explain())

  ```

  ```ts TypeScript [expandable] theme={null}
  import { SlidingCache } from "beeai-framework/cache/slidingCache";

  const cache = new SlidingCache<number>({
    size: 3, // (required) number of items that can be live in the cache at a single moment
    ttl: 1000, // (optional, default is Infinity) Time in milliseconds after the element is removed from a cache
  });

  await cache.set("a", 1);
  await cache.set("b", 2);
  await cache.set("c", 3);

  await cache.set("d", 4); // overflow - cache internally removes the oldest entry (key "a")
  console.log(await cache.has("a")); // false
  console.log(await cache.size()); // 3

  ```
</CodeGroup>

### FileCache

Persists cache data to disk, allowing data to survive if application restarts.
Use it when caches must survive process restarts or you need to share state between workers. Persisted entries still respect TTL and eviction settings, so design your limits accordingly.

<CodeGroup>
  ```py Python [expandable] theme={null}
  import asyncio
  import json
  import sys
  import tempfile
  import time
  import traceback
  from collections import OrderedDict
  from collections.abc import Mapping
  from pathlib import Path
  from typing import Generic, TypeVar

  from beeai_framework.cache import BaseCache
  from beeai_framework.errors import FrameworkError

  T = TypeVar("T")


  class JsonFileCache(BaseCache[T], Generic[T]):
      """Simple file-backed cache with optional LRU eviction and TTL support."""

      def __init__(self, path: Path, *, size: int = 128, ttl: float | None = None) -> None:
          super().__init__()
          self._path = path
          self._size = size
          self._ttl = ttl
          self._items: OrderedDict[str, tuple[T, float | None]] = OrderedDict()
          self._load_from_disk()

      @property
      def source(self) -> Path:
          return self._path

      @classmethod
      async def from_mapping(
          cls,
          path: Path,
          items: Mapping[str, T],
          *,
          size: int = 128,
          ttl: float | None = None,
      ) -> "JsonFileCache[T]":
          cache = cls(path, size=size, ttl=ttl)
          for key, value in items.items():
              await cache.set(key, value)
          return cache

      async def size(self) -> int:
          await self._purge_expired()
          return len(self._items)

      async def set(self, key: str, value: T) -> None:
          await self._purge_expired()
          expires_at = time.time() + self._ttl if self._ttl is not None else None
          if key in self._items:
              self._items.pop(key)
          self._items[key] = (value, expires_at)
          await self._enforce_capacity()
          self._dump_to_disk()

      async def get(self, key: str) -> T | None:
          await self._purge_expired()
          if key not in self._items:
              return None

          value, expires_at = self._items.pop(key)
          self._items[key] = (value, expires_at)
          return value

      async def has(self, key: str) -> bool:
          await self._purge_expired()
          return key in self._items

      async def delete(self, key: str) -> bool:
          await self._purge_expired()
          if key not in self._items:
              return False

          self._items.pop(key)
          self._dump_to_disk()
          return True

      async def clear(self) -> None:
          self._items.clear()
          if self._path.exists():
              self._path.unlink()

      async def reload(self) -> None:
          self._items.clear()
          self._load_from_disk()
          await self._purge_expired()

      async def _purge_expired(self) -> None:
          now = time.time()
          expired_keys = [
              key for key, (_, expires_at) in list(self._items.items()) if expires_at is not None and expires_at <= now
          ]
          for key in expired_keys:
              self._items.pop(key, None)
          if expired_keys:
              self._dump_to_disk()

      async def _enforce_capacity(self) -> None:
          while len(self._items) > self._size:
              oldest_key, _ = self._items.popitem(last=False)

      def _load_from_disk(self) -> None:
          if not self._path.exists():
              return

          try:
              raw = json.loads(self._path.read_text())
          except json.JSONDecodeError:
              return

          now = time.time()
          for key, payload in raw.items():
              expires_at = payload.get("expires_at")
              if expires_at is not None and expires_at <= now:
                  continue
              self._items[key] = (payload["value"], expires_at)

      def _dump_to_disk(self) -> None:
          self._path.parent.mkdir(parents=True, exist_ok=True)
          data = {key: {"value": value, "expires_at": expires_at} for key, (value, expires_at) in self._items.items()}
          self._path.write_text(json.dumps(data, indent=2))


  async def main() -> None:
      with tempfile.TemporaryDirectory() as tmpdir:
          path = Path(tmpdir) / "bee_cache.json"
          cache: JsonFileCache[dict[str, str]] = JsonFileCache(path, size=2, ttl=1.5)

          await cache.set("profile", {"name": "Bee", "role": "assistant"})
          await cache.set("settings", {"theme": "dark"})
          print(f"Cache persisted to {cache.source}")

          await cache.set("session", {"token": "abc123"})
          print(await cache.has("profile"))  # False -> evicted when capacity exceeded

          reloaded: JsonFileCache[dict[str, str]] = JsonFileCache(path, size=2, ttl=1.5)
          print(await reloaded.get("settings"))  # {'theme': 'dark'}

          await asyncio.sleep(1.6)
          await reloaded.reload()
          print(await reloaded.get("session"))  # None -> TTL expired


  if __name__ == "__main__":
      try:
          asyncio.run(main())
      except FrameworkError as e:
          traceback.print_exc()
          sys.exit(e.explain())

  ```

  ```ts TypeScript [expandable] theme={null}
  import { FileCache } from "beeai-framework/cache/fileCache";
  import * as os from "node:os";

  const cache = new FileCache({
    fullPath: `${os.tmpdir()}/bee_file_cache_${Date.now()}.json`,
  });
  console.log(`Saving cache to "${cache.source}"`);
  await cache.set("abc", { firstName: "John", lastName: "Doe" });

  ```
</CodeGroup>

#### With custom provider

Seed a file-backed cache from another provider when you want to warm the disk cache before first use or promote hot data captured in memory. The example below clones an `UnconstrainedCache` into the JSON file cache so new processes can reuse it immediately.

<CodeGroup>
  ```py Python [expandable] theme={null}
  import asyncio
  import sys
  import tempfile
  import traceback
  from pathlib import Path
  from typing import TypeVar

  from beeai_framework.cache import UnconstrainedCache
  from beeai_framework.errors import FrameworkError
  from examples.cache.file_cache import JsonFileCache

  T = TypeVar("T")


  async def export_cache(provider: UnconstrainedCache[T]) -> dict[str, T]:
      """Clone an in-memory cache so that we can safely persist its content."""
      cloned = await provider.clone()
      # UnconstrainedCache stores entries in a simple dict, so cloning is inexpensive here.
      return getattr(cloned, "_provider", {}).copy()


  async def main() -> None:
      memory_cache: UnconstrainedCache[int] = UnconstrainedCache()
      await memory_cache.set("tasks:open", 7)
      await memory_cache.set("tasks:closed", 12)

      with tempfile.TemporaryDirectory() as tmpdir:
          path = Path(tmpdir) / "bee_cache.json"

          file_cache = await JsonFileCache.from_mapping(path, await export_cache(memory_cache), size=10, ttl=10)
          print(f"Promoted cache to disk: {file_cache.source}")

          print(await file_cache.get("tasks:open"))  # 7
          await file_cache.set("tasks:stale", 1)
          print(await file_cache.size())  # 3

          reloaded: JsonFileCache[int] = JsonFileCache(path, size=10, ttl=10)
          print(await reloaded.get("tasks:closed"))  # 12


  if __name__ == "__main__":
      try:
          asyncio.run(main())
      except FrameworkError as e:
          traceback.print_exc()
          sys.exit(e.explain())

  ```

  ```ts TypeScript [expandable] theme={null}
  import { FileCache } from "beeai-framework/cache/fileCache";
  import { UnconstrainedCache } from "beeai-framework/cache/unconstrainedCache";
  import os from "node:os";

  const memoryCache = new UnconstrainedCache<number>();
  await memoryCache.set("a", 1);

  const fileCache = await FileCache.fromProvider(memoryCache, {
    fullPath: `${os.tmpdir()}/bee_file_cache.json`,
  });
  console.log(`Saving cache to "${fileCache.source}"`);
  console.log(await fileCache.get("a")); // 1

  ```
</CodeGroup>

### NullCache

A special cache that implements the `BaseCache` interface but performs no caching. Useful for testing or temporarily disabling caching.

The reason for implementing is to enable [Null object pattern](https://en.wikipedia.org/wiki/Null_object_pattern).

***

## Advanced usage

### Cache decorator

Create a reusable decorator when you want to keep caching logic close to your functions without wiring cache calls manually.

<CodeGroup>
  ```py Python [expandable] theme={null}
  import asyncio
  import sys
  import time
  import traceback

  from beeai_framework.cache import SlidingCache, cached
  from beeai_framework.errors import FrameworkError

  request_cache: SlidingCache[str] = SlidingCache(size=8, ttl=2)


  class ReportGenerator:
      def __init__(self) -> None:
          self._call_counter = 0

      @cached(request_cache)
      async def generate(self, department: str) -> str:
          self._call_counter += 1
          await asyncio.sleep(0.1)
          timestamp = time.time()
          return f"{department}:{self._call_counter}@{timestamp:.0f}"


  async def main() -> None:
      generator = ReportGenerator()
      first = await generator.generate("sales")
      second = await generator.generate("sales")
      print(first == second)  # True -> cached result

      await asyncio.sleep(2.1)  # TTL expired
      third = await generator.generate("sales")
      print(first == third)  # False -> cache miss, recomputed


  if __name__ == "__main__":
      try:
          asyncio.run(main())
      except FrameworkError as e:
          traceback.print_exc()
          sys.exit(e.explain())

  ```

  ```ts TypeScript [expandable] theme={null}
  import { Cache } from "beeai-framework/cache/decoratorCache";

  class Generator {
    @Cache()
    get(seed: number) {
      return (Math.random() * 1000) / Math.max(seed, 1);
    }
  }

  const generator = new Generator();
  const a = generator.get(5);
  const b = generator.get(5);
  console.info(a === b); // true
  console.info(a === generator.get(6)); // false

  ```
</CodeGroup>

For more complex caching logic, you can customize the key generation:
Use custom key builders to partition cache entries per tenant or time window, and clear the cache in response to deployment events.

<CodeGroup>
  ```py Python [expandable] theme={null}
  import asyncio
  import datetime as dt
  import random
  import sys
  import traceback
  from typing import Any

  from beeai_framework.cache import BaseCache, SlidingCache, cached
  from beeai_framework.errors import FrameworkError

  activity_cache: SlidingCache[dict[str, Any]] = SlidingCache(size=16, ttl=5)


  def session_cache_key(args: tuple[Any, ...], kwargs: dict[str, Any]) -> str:
      user_id = kwargs.get("user_id") or args[0]
      scope = kwargs.get("scope", "default")
      bucket: int | None = kwargs.get("minute_bucket")
      payload = {"user_id": user_id, "scope": scope}
      if bucket is not None:
          payload["minute_bucket"] = bucket
      return BaseCache.generate_key(payload)


  class FeatureFlagService:
      def __init__(self, *, caching_enabled: bool = True) -> None:
          self._enabled = caching_enabled
          self._db_hits = 0

      @cached(activity_cache, enabled=True, key_fn=session_cache_key)
      async def load_flags(
          self, user_id: str, scope: str = "default", minute_bucket: int | None = None
      ) -> dict[str, Any]:
          self._db_hits += 1
          await asyncio.sleep(0.05)
          return {
              "user": user_id,
              "scope": scope,
              "db_hits": self._db_hits,
              "flags": {"beta_search": random.choice([True, False])},
              "refreshed_at": dt.datetime.now(dt.UTC).isoformat(timespec="seconds"),
          }


  async def main() -> None:
      service = FeatureFlagService()
      bucket = int(dt.datetime.now(dt.UTC).timestamp() // 60)

      first = await service.load_flags("42", scope="admin", minute_bucket=bucket)
      second = await service.load_flags("42", scope="admin", minute_bucket=bucket)
      print(first == second)  # True -> same cache key within a minute bucket

      await activity_cache.clear()  # Manual invalidation when new feature set deployed
      refreshed = await service.load_flags("42", scope="admin", minute_bucket=bucket)
      print(refreshed["db_hits"])  # 2 -> cache miss due to clear

      # Changing scope hits a different cache entry without flushing existing data.
      other_scope = await service.load_flags("42", scope="viewer", minute_bucket=bucket)
      print(other_scope["scope"])  # viewer


  if __name__ == "__main__":
      try:
          asyncio.run(main())
      except FrameworkError as e:
          traceback.print_exc()
          sys.exit(e.explain())

  ```

  ```ts TypeScript [expandable] theme={null}
  import { Cache, SingletonCacheKeyFn } from "beeai-framework/cache/decoratorCache";

  class MyService {
    @Cache({
      cacheKey: SingletonCacheKeyFn,
      ttl: 3600,
      enumerable: true,
      enabled: true,
    })
    get id() {
      return Math.floor(Math.random() * 1000);
    }

    reset() {
      Cache.getInstance(this, "id").clear();
    }
  }

  const service = new MyService();
  const a = service.id;
  console.info(a === service.id); // true
  service.reset();
  console.info(a === service.id); // false

  ```
</CodeGroup>

### CacheFn helper

For more dynamic caching needs, the `CacheFn` helper provides a functional approach:
It is well-suited for API tokens or other resources that return an expiry with each refresh—call `update_ttl` before returning the value so the cache matches the upstream lifetime.

<CodeGroup>
  ```py Python [expandable] theme={null}
  import asyncio
  import random
  import sys
  import traceback

  from typing_extensions import TypedDict

  from beeai_framework.cache import CacheFn
  from beeai_framework.errors import FrameworkError


  class TokenResponse(TypedDict):
      token: str
      expires_in: float


  async def main() -> None:
      async def fetch_api_token() -> str:
          response: TokenResponse = {"token": f"TOKEN-{random.randint(1000, 9999)}", "expires_in": 0.2}
          get_token.update_ttl(response["expires_in"])
          await asyncio.sleep(0.05)
          return response["token"]

      get_token = CacheFn.create(fetch_api_token, default_ttl=0.1)

      first = await get_token()
      second = await get_token()
      print(first == second)  # True -> cached value

      await asyncio.sleep(0.25)
      refreshed = await get_token()
      print(first == refreshed)  # False -> TTL elapsed, value refreshed


  if __name__ == "__main__":
      try:
          asyncio.run(main())
      except FrameworkError as e:
          traceback.print_exc()
          sys.exit(e.explain())

  ```

  ```ts TypeScript [expandable] theme={null}
  import { CacheFn } from "beeai-framework/cache/decoratorCache";
  import { setTimeout } from "node:timers/promises";

  const getSecret = CacheFn.create(
    async () => {
      // instead of mocking response you would do a real fetch request
      const response = await Promise.resolve({ secret: Math.random(), expiresIn: 100 });
      getSecret.updateTTL(response.expiresIn);
      return response.secret;
    },
    {}, // options object
  );

  const token = await getSecret();
  console.info(token === (await getSecret())); // true
  await setTimeout(150);
  console.info(token === (await getSecret())); // false

  ```
</CodeGroup>

***

## Creating a custom cache provider

You can create your own cache implementation by extending the `BaseCache` class:

<CodeGroup>
  ```py Python [expandable] theme={null}
  from typing import TypeVar

  from beeai_framework.cache import BaseCache

  T = TypeVar("T")


  class CustomCache(BaseCache[T]):
      async def size(self) -> int:
          raise NotImplementedError("CustomCache 'size' not yet implemented")

      # pyrefly: ignore [bad-param-name-override]
      async def set(self, _key: str, _value: T) -> None:
          raise NotImplementedError("CustomCache 'set' not yet implemented")

      async def get(self, key: str) -> T | None:
          raise NotImplementedError("CustomCache 'get' not yet implemented")

      async def has(self, key: str) -> bool:
          raise NotImplementedError("CustomCache 'has' not yet implemented")

      async def delete(self, key: str) -> bool:
          raise NotImplementedError("CustomCache 'delete' not yet implemented")

      async def clear(self) -> None:
          raise NotImplementedError("CustomCache 'clear' not yet implemented")

  ```

  ```ts TypeScript [expandable] theme={null}
  import { BaseCache } from "beeai-framework/cache/base";
  import { NotImplementedError } from "beeai-framework/errors";

  export class CustomCache<T> extends BaseCache<T> {
    size(): Promise<number> {
      throw new NotImplementedError();
    }

    set(key: string, value: T): Promise<void> {
      throw new NotImplementedError();
    }

    get(key: string): Promise<T | undefined> {
      throw new NotImplementedError();
    }

    has(key: string): Promise<boolean> {
      throw new NotImplementedError();
    }

    delete(key: string): Promise<boolean> {
      throw new NotImplementedError();
    }

    clear(): Promise<void> {
      throw new NotImplementedError();
    }

    createSnapshot() {
      throw new NotImplementedError();
    }

    loadSnapshot(snapshot: ReturnType<typeof this.createSnapshot>): void {
      throw new NotImplementedError();
    }
  }

  ```
</CodeGroup>

***

## Examples

<CardGroup cols={2}>
  <Card title="Python" icon="python" href="https://github.com/i-am-bee/beeai-framework/tree/main/python/examples/cache">
    Explore reference cache implementations in Python
  </Card>

  <Card title="TypeScript" icon="js" href="https://github.com/i-am-bee/beeai-framework/tree/main/typescript/examples/cache">
    Explore reference cache implementations in TypeScript
  </Card>
</CardGroup>
