mirror of
https://github.com/mudler/LocalAI.git
synced 2025-12-25 07:29:07 -05:00
* feat(mlx): add thread-safe LRU prompt cache Port mlx-lm's LRUPromptCache to fix race condition where concurrent requests corrupt shared KV cache state. The previous implementation used a single prompt_cache instance shared across all requests. Changes: - Add backend/python/common/mlx_cache.py with ThreadSafeLRUPromptCache - Modify backend.py to use per-request cache isolation via fetch/insert - Add prefix matching for cache reuse across similar prompts - Add LRU eviction (default 10 entries, configurable) - Add concurrency and cache unit tests The cache uses a trie-based structure for efficient prefix matching, allowing prompts that share common prefixes to reuse cached KV states. Thread safety is provided via threading.Lock. New configuration options: - max_cache_entries: Maximum LRU cache entries (default: 10) - max_kv_size: Maximum KV cache size per entry (default: None) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> Signed-off-by: Blightbow <blightbow@users.noreply.github.com> * feat(mlx): add min_p and top_k sampler support Add MinP field to proto (field 52) following the precedent set by other non-OpenAI sampling parameters like TopK, TailFreeSamplingZ, TypicalP, and Mirostat. Changes: - backend.proto: Add float MinP field for min-p sampling - backend.py: Extract and pass min_p and top_k to mlx_lm sampler (top_k was in proto but not being passed) - test.py: Fix test_sampling_params to use valid proto fields and switch to MLX-compatible model (mlx-community/Llama-3.2-1B-Instruct) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> Signed-off-by: Blightbow <blightbow@users.noreply.github.com> * refactor(mlx): move mlx_cache.py from common to mlx backend The ThreadSafeLRUPromptCache is only used by the mlx backend. After evaluating mlx-vlm, it was determined that the cache cannot be shared because mlx-vlm's generate/stream_generate functions don't support the prompt_cache parameter that mlx_lm provides. - Move mlx_cache.py from backend/python/common/ to backend/python/mlx/ - Remove sys.path manipulation from backend.py and test.py - Fix test assertion to expect "MLX model loaded successfully" 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> Signed-off-by: Blightbow <blightbow@users.noreply.github.com> * test(mlx): add comprehensive cache tests and document upstream behavior Added comprehensive unit tests (test_mlx_cache.py) covering all cache operation modes: - Exact match - Shorter prefix match - Longer prefix match with trimming - No match scenarios - LRU eviction and access order - Reference counting and deep copy behavior - Multi-model namespacing - Thread safety with data integrity verification Documents upstream mlx_lm/server.py behavior: single-token prefixes are deliberately not matched (uses > 0, not >= 0) to allow longer cached sequences to be preferred for trimming. This is acceptable because real prompts with chat templates are always many tokens. Removed weak unit tests from test.py that only verified "no exception thrown" rather than correctness. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> Signed-off-by: Blightbow <blightbow@users.noreply.github.com> * chore(mlx): remove unused MinP proto field The MinP field was added to PredictOptions but is not populated by the Go frontend/API. The MLX backend uses getattr with a default value, so it works without the proto field. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> Signed-off-by: Blightbow <blightbow@users.noreply.github.com> --------- Signed-off-by: Blightbow <blightbow@users.noreply.github.com> Co-authored-by: Blightbow <blightbow@users.noreply.github.com> Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
481 lines
18 KiB
Python
481 lines
18 KiB
Python
"""
|
|
Comprehensive unit tests for ThreadSafeLRUPromptCache.
|
|
|
|
Tests all cache operation modes:
|
|
- Exact match
|
|
- Shorter prefix match
|
|
- Longer prefix match (with trimming)
|
|
- No match
|
|
- LRU eviction
|
|
- Reference counting
|
|
- Multi-model namespacing
|
|
- Thread safety with data integrity verification
|
|
"""
|
|
import unittest
|
|
import concurrent.futures
|
|
import threading
|
|
import copy
|
|
from mlx_cache import ThreadSafeLRUPromptCache
|
|
|
|
|
|
class TestCacheExactMatch(unittest.TestCase):
|
|
"""Tests for exact match cache behavior."""
|
|
|
|
def setUp(self):
|
|
self.cache = ThreadSafeLRUPromptCache(max_size=10)
|
|
|
|
def test_exact_match_returns_cache_and_empty_remaining(self):
|
|
"""Exact match should return the cache with no remaining tokens."""
|
|
tokens = [1, 2, 3, 4, 5]
|
|
mock_cache = ["kv_cache_data"]
|
|
|
|
self.cache.insert_cache("model1", tokens, mock_cache)
|
|
result_cache, remaining = self.cache.fetch_nearest_cache("model1", tokens)
|
|
|
|
self.assertEqual(result_cache, mock_cache)
|
|
self.assertEqual(remaining, [])
|
|
|
|
def test_exact_match_extracts_and_removes_from_cache(self):
|
|
"""Fetching exact match with count=1 should remove entry from cache."""
|
|
tokens = [1, 2, 3]
|
|
self.cache.insert_cache("model1", tokens, ["cache"])
|
|
|
|
self.assertEqual(len(self.cache), 1)
|
|
|
|
# First fetch extracts the entry
|
|
self.cache.fetch_nearest_cache("model1", tokens)
|
|
|
|
# Cache should now be empty
|
|
self.assertEqual(len(self.cache), 0)
|
|
|
|
# Second fetch should return None (no match)
|
|
result_cache, remaining = self.cache.fetch_nearest_cache("model1", tokens)
|
|
self.assertIsNone(result_cache)
|
|
self.assertEqual(remaining, tokens)
|
|
|
|
|
|
class TestCacheShorterPrefix(unittest.TestCase):
|
|
"""Tests for shorter prefix match behavior."""
|
|
|
|
def setUp(self):
|
|
self.cache = ThreadSafeLRUPromptCache(max_size=10)
|
|
|
|
def test_shorter_prefix_returns_cache_with_remaining_tokens(self):
|
|
"""When cached prefix is shorter, return cache and remaining suffix."""
|
|
short_tokens = [1, 2, 3]
|
|
long_tokens = [1, 2, 3, 4, 5, 6]
|
|
mock_cache = ["prefix_cache"]
|
|
|
|
self.cache.insert_cache("model1", short_tokens, mock_cache)
|
|
result_cache, remaining = self.cache.fetch_nearest_cache("model1", long_tokens)
|
|
|
|
self.assertEqual(result_cache, mock_cache)
|
|
self.assertEqual(remaining, [4, 5, 6])
|
|
|
|
def test_shorter_prefix_correct_remaining_calculation(self):
|
|
"""Verify remaining tokens are calculated correctly for various prefix lengths."""
|
|
# Note: Single-token prefixes ([1] -> [1,2,3]) are deliberately not matched
|
|
# to allow longer cached sequences to be preferred for trimming.
|
|
# This matches upstream mlx_lm/server.py behavior.
|
|
test_cases = [
|
|
# (cached_tokens, requested_tokens, expected_remaining)
|
|
([1, 2], [1, 2, 3, 4, 5], [3, 4, 5]),
|
|
([10, 20, 30, 40], [10, 20, 30, 40, 50], [50]),
|
|
]
|
|
|
|
for cached, requested, expected_remaining in test_cases:
|
|
with self.subTest(cached=cached, requested=requested):
|
|
cache = ThreadSafeLRUPromptCache(max_size=10)
|
|
cache.insert_cache("model", cached, ["cache"])
|
|
result_cache, remaining = cache.fetch_nearest_cache("model", requested)
|
|
|
|
self.assertIsNotNone(result_cache)
|
|
self.assertEqual(remaining, expected_remaining)
|
|
|
|
def test_single_token_prefix_not_matched(self):
|
|
"""Single-token prefixes are not matched (by design, matches upstream).
|
|
|
|
This allows longer cached sequences to be preferred for trimming,
|
|
which provides better KV cache reuse. Single-token caches are rare
|
|
in practice since real prompts with chat templates are many tokens.
|
|
"""
|
|
cache = ThreadSafeLRUPromptCache(max_size=10)
|
|
cache.insert_cache("model", [1], ["cache"])
|
|
|
|
result_cache, remaining = cache.fetch_nearest_cache("model", [1, 2, 3])
|
|
|
|
# Single-token prefix is NOT matched
|
|
self.assertIsNone(result_cache)
|
|
self.assertEqual(remaining, [1, 2, 3])
|
|
|
|
|
|
class TestCacheLongerPrefix(unittest.TestCase):
|
|
"""Tests for longer prefix match behavior (trimming)."""
|
|
|
|
def setUp(self):
|
|
# Track trim calls for verification
|
|
self.trim_calls = []
|
|
|
|
def mock_can_trim(cache):
|
|
return True
|
|
|
|
def mock_trim(cache, num_to_trim):
|
|
self.trim_calls.append(num_to_trim)
|
|
# Simulate trimming by modifying the cache
|
|
cache.append(f"trimmed_{num_to_trim}")
|
|
|
|
self.cache = ThreadSafeLRUPromptCache(
|
|
max_size=10,
|
|
can_trim_fn=mock_can_trim,
|
|
trim_fn=mock_trim,
|
|
)
|
|
|
|
def test_longer_prefix_triggers_trim(self):
|
|
"""When cached sequence is longer, should trim to match requested prefix."""
|
|
long_tokens = [1, 2, 3, 4, 5]
|
|
short_tokens = [1, 2, 3]
|
|
|
|
self.cache.insert_cache("model1", long_tokens, ["original_cache"])
|
|
result_cache, remaining = self.cache.fetch_nearest_cache("model1", short_tokens)
|
|
|
|
# Should have called trim
|
|
self.assertTrue(len(self.trim_calls) > 0, "trim_fn should have been called")
|
|
# Result should be a trimmed copy, not the original
|
|
self.assertIn("trimmed_", str(result_cache))
|
|
|
|
def test_longer_prefix_without_trim_fn_returns_no_match(self):
|
|
"""Without trim functions, longer prefix should not match."""
|
|
cache_no_trim = ThreadSafeLRUPromptCache(max_size=10)
|
|
|
|
long_tokens = [1, 2, 3, 4, 5]
|
|
short_tokens = [1, 2, 3]
|
|
|
|
cache_no_trim.insert_cache("model1", long_tokens, ["cache"])
|
|
result_cache, remaining = cache_no_trim.fetch_nearest_cache("model1", short_tokens)
|
|
|
|
# Without trim_fn, should return no match
|
|
self.assertIsNone(result_cache)
|
|
self.assertEqual(remaining, short_tokens)
|
|
|
|
def test_longer_prefix_can_trim_false_returns_no_match(self):
|
|
"""When can_trim_fn returns False, should not attempt trim."""
|
|
cache = ThreadSafeLRUPromptCache(
|
|
max_size=10,
|
|
can_trim_fn=lambda c: False,
|
|
trim_fn=lambda c, n: None,
|
|
)
|
|
|
|
cache.insert_cache("model1", [1, 2, 3, 4, 5], ["cache"])
|
|
result_cache, remaining = cache.fetch_nearest_cache("model1", [1, 2, 3])
|
|
|
|
self.assertIsNone(result_cache)
|
|
self.assertEqual(remaining, [1, 2, 3])
|
|
|
|
|
|
class TestCacheNoMatch(unittest.TestCase):
|
|
"""Tests for no match behavior."""
|
|
|
|
def setUp(self):
|
|
self.cache = ThreadSafeLRUPromptCache(max_size=10)
|
|
|
|
def test_empty_cache_returns_none(self):
|
|
"""Empty cache should return None and all tokens as remaining."""
|
|
tokens = [1, 2, 3]
|
|
result_cache, remaining = self.cache.fetch_nearest_cache("model1", tokens)
|
|
|
|
self.assertIsNone(result_cache)
|
|
self.assertEqual(remaining, tokens)
|
|
|
|
def test_different_prefix_returns_none(self):
|
|
"""Tokens with different prefix should not match."""
|
|
self.cache.insert_cache("model1", [1, 2, 3], ["cache"])
|
|
|
|
# Completely different tokens
|
|
result_cache, remaining = self.cache.fetch_nearest_cache("model1", [4, 5, 6])
|
|
|
|
self.assertIsNone(result_cache)
|
|
self.assertEqual(remaining, [4, 5, 6])
|
|
|
|
def test_partial_prefix_mismatch_returns_none(self):
|
|
"""Tokens that diverge mid-sequence should not match."""
|
|
self.cache.insert_cache("model1", [1, 2, 3], ["cache"])
|
|
|
|
# Same start but diverges
|
|
result_cache, remaining = self.cache.fetch_nearest_cache("model1", [1, 2, 99])
|
|
|
|
self.assertIsNone(result_cache)
|
|
self.assertEqual(remaining, [1, 2, 99])
|
|
|
|
def test_wrong_model_returns_none(self):
|
|
"""Different model key should not match."""
|
|
self.cache.insert_cache("model1", [1, 2, 3], ["cache"])
|
|
|
|
result_cache, remaining = self.cache.fetch_nearest_cache("model2", [1, 2, 3])
|
|
|
|
self.assertIsNone(result_cache)
|
|
self.assertEqual(remaining, [1, 2, 3])
|
|
|
|
|
|
class TestCacheLRUEviction(unittest.TestCase):
|
|
"""Tests for LRU eviction behavior."""
|
|
|
|
def setUp(self):
|
|
self.cache = ThreadSafeLRUPromptCache(max_size=3)
|
|
|
|
def test_evicts_oldest_when_full(self):
|
|
"""Should evict least recently used entry when capacity exceeded."""
|
|
self.cache.insert_cache("model", [1], ["cache1"])
|
|
self.cache.insert_cache("model", [2], ["cache2"])
|
|
self.cache.insert_cache("model", [3], ["cache3"])
|
|
|
|
self.assertEqual(len(self.cache), 3)
|
|
|
|
# Insert 4th entry - should evict [1]
|
|
self.cache.insert_cache("model", [4], ["cache4"])
|
|
|
|
self.assertEqual(len(self.cache), 3)
|
|
|
|
# [1] should be evicted
|
|
result, _ = self.cache.fetch_nearest_cache("model", [1])
|
|
self.assertIsNone(result)
|
|
|
|
# [2], [3], [4] should still exist
|
|
for tokens in [[2], [3], [4]]:
|
|
# Re-insert since fetch extracts
|
|
self.cache.insert_cache("model", tokens, [f"cache{tokens[0]}"])
|
|
|
|
result2, _ = self.cache.fetch_nearest_cache("model", [2])
|
|
self.assertIsNotNone(result2)
|
|
|
|
def test_access_updates_lru_order(self):
|
|
"""Accessing an entry should move it to most recently used."""
|
|
self.cache.insert_cache("model", [1], ["cache1"])
|
|
self.cache.insert_cache("model", [2], ["cache2"])
|
|
self.cache.insert_cache("model", [3], ["cache3"])
|
|
|
|
# Access [1] to make it most recently used
|
|
cache1, _ = self.cache.fetch_nearest_cache("model", [1])
|
|
# Re-insert it (simulating normal usage pattern)
|
|
self.cache.insert_cache("model", [1], cache1)
|
|
|
|
# Now insert two more entries - should evict [2] then [3], not [1]
|
|
self.cache.insert_cache("model", [4], ["cache4"])
|
|
self.cache.insert_cache("model", [5], ["cache5"])
|
|
|
|
# [1] should still exist (was accessed, so not evicted)
|
|
result1, _ = self.cache.fetch_nearest_cache("model", [1])
|
|
self.assertIsNotNone(result1)
|
|
|
|
# [2] should be evicted (was oldest after [1] was accessed)
|
|
result2, _ = self.cache.fetch_nearest_cache("model", [2])
|
|
self.assertIsNone(result2)
|
|
|
|
|
|
class TestCacheReferenceCount(unittest.TestCase):
|
|
"""Tests for reference counting behavior."""
|
|
|
|
def setUp(self):
|
|
self.cache = ThreadSafeLRUPromptCache(max_size=10)
|
|
|
|
def test_multiple_inserts_increment_count(self):
|
|
"""Inserting same tokens multiple times should increment count."""
|
|
tokens = [1, 2, 3]
|
|
|
|
self.cache.insert_cache("model", tokens, ["cache"])
|
|
self.cache.insert_cache("model", tokens, ["cache"])
|
|
self.cache.insert_cache("model", tokens, ["cache"])
|
|
|
|
# Should still be one entry (with count=3 internally)
|
|
self.assertEqual(len(self.cache), 1)
|
|
|
|
# First two fetches should return copies (count decremented)
|
|
result1, _ = self.cache.fetch_nearest_cache("model", tokens)
|
|
self.assertIsNotNone(result1)
|
|
|
|
result2, _ = self.cache.fetch_nearest_cache("model", tokens)
|
|
self.assertIsNotNone(result2)
|
|
|
|
# Third fetch extracts the last reference
|
|
result3, _ = self.cache.fetch_nearest_cache("model", tokens)
|
|
self.assertIsNotNone(result3)
|
|
|
|
# Fourth fetch should return None (entry fully extracted)
|
|
result4, _ = self.cache.fetch_nearest_cache("model", tokens)
|
|
self.assertIsNone(result4)
|
|
|
|
def test_extract_with_high_count_returns_deep_copy(self):
|
|
"""When count > 1, extract should return a deep copy."""
|
|
tokens = [1, 2, 3]
|
|
original_cache = [{"nested": "data"}]
|
|
|
|
self.cache.insert_cache("model", tokens, original_cache)
|
|
self.cache.insert_cache("model", tokens, original_cache) # count=2
|
|
|
|
result1, _ = self.cache.fetch_nearest_cache("model", tokens)
|
|
|
|
# Modify the returned cache
|
|
result1[0]["nested"] = "modified"
|
|
|
|
# Second fetch should get unmodified copy
|
|
result2, _ = self.cache.fetch_nearest_cache("model", tokens)
|
|
self.assertEqual(result2[0]["nested"], "data")
|
|
|
|
|
|
class TestCacheMultiModel(unittest.TestCase):
|
|
"""Tests for multi-model namespacing."""
|
|
|
|
def setUp(self):
|
|
self.cache = ThreadSafeLRUPromptCache(max_size=10)
|
|
|
|
def test_same_tokens_different_models_are_separate(self):
|
|
"""Same token sequence under different models should be independent."""
|
|
tokens = [1, 2, 3]
|
|
|
|
self.cache.insert_cache("model_a", tokens, ["cache_a"])
|
|
self.cache.insert_cache("model_b", tokens, ["cache_b"])
|
|
|
|
self.assertEqual(len(self.cache), 2)
|
|
|
|
result_a, _ = self.cache.fetch_nearest_cache("model_a", tokens)
|
|
result_b, _ = self.cache.fetch_nearest_cache("model_b", tokens)
|
|
|
|
self.assertEqual(result_a, ["cache_a"])
|
|
self.assertEqual(result_b, ["cache_b"])
|
|
|
|
def test_eviction_across_models(self):
|
|
"""LRU eviction should work across different models."""
|
|
cache = ThreadSafeLRUPromptCache(max_size=3)
|
|
|
|
cache.insert_cache("model_a", [1], ["a1"])
|
|
cache.insert_cache("model_b", [1], ["b1"])
|
|
cache.insert_cache("model_a", [2], ["a2"])
|
|
|
|
self.assertEqual(len(cache), 3)
|
|
|
|
# Insert 4th - should evict model_a:[1] (oldest)
|
|
cache.insert_cache("model_b", [2], ["b2"])
|
|
|
|
result, _ = cache.fetch_nearest_cache("model_a", [1])
|
|
self.assertIsNone(result)
|
|
|
|
|
|
class TestCacheThreadSafety(unittest.TestCase):
|
|
"""Tests for thread safety with data integrity verification."""
|
|
|
|
def test_concurrent_inserts_no_data_loss(self):
|
|
"""Concurrent inserts should not lose data."""
|
|
cache = ThreadSafeLRUPromptCache(max_size=100)
|
|
num_threads = 10
|
|
inserts_per_thread = 20
|
|
|
|
def insert_entries(thread_id):
|
|
for i in range(inserts_per_thread):
|
|
tokens = [thread_id, i]
|
|
cache.insert_cache("model", tokens, [f"cache_{thread_id}_{i}"])
|
|
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor:
|
|
futures = [executor.submit(insert_entries, tid) for tid in range(num_threads)]
|
|
concurrent.futures.wait(futures)
|
|
|
|
# Verify expected number of entries (may be less due to LRU eviction with max_size=100)
|
|
# But should be exactly 100 since we inserted exactly 200 and max_size is 100
|
|
self.assertEqual(len(cache), 100)
|
|
|
|
def test_concurrent_fetch_and_insert_no_corruption(self):
|
|
"""Concurrent fetches and inserts should not corrupt data."""
|
|
cache = ThreadSafeLRUPromptCache(max_size=50)
|
|
errors = []
|
|
lock = threading.Lock()
|
|
|
|
# Pre-populate with known data
|
|
for i in range(20):
|
|
cache.insert_cache("model", [i], [f"original_{i}"])
|
|
|
|
def fetch_and_verify(thread_id):
|
|
try:
|
|
for _ in range(50):
|
|
token_id = thread_id % 20
|
|
result, remaining = cache.fetch_nearest_cache("model", [token_id])
|
|
|
|
if result is not None:
|
|
# Verify data integrity
|
|
expected_prefix = f"original_{token_id}"
|
|
if not str(result[0]).startswith("original_"):
|
|
with lock:
|
|
errors.append(f"Corrupted data: {result}")
|
|
|
|
# Re-insert to keep cache populated
|
|
cache.insert_cache("model", [token_id], result)
|
|
|
|
except Exception as e:
|
|
with lock:
|
|
errors.append(str(e))
|
|
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
|
|
futures = [executor.submit(fetch_and_verify, tid) for tid in range(10)]
|
|
concurrent.futures.wait(futures)
|
|
|
|
self.assertEqual(errors, [], f"Thread safety errors: {errors}")
|
|
|
|
def test_concurrent_operations_maintain_cache_bounds(self):
|
|
"""Cache size should never exceed max_size under concurrent operations."""
|
|
max_size = 10
|
|
cache = ThreadSafeLRUPromptCache(max_size=max_size)
|
|
size_violations = []
|
|
lock = threading.Lock()
|
|
|
|
def random_operations(thread_id):
|
|
import random
|
|
for i in range(100):
|
|
tokens = [random.randint(0, 50)]
|
|
if random.random() < 0.7:
|
|
cache.insert_cache("model", tokens, [f"cache_{thread_id}_{i}"])
|
|
else:
|
|
cache.fetch_nearest_cache("model", tokens)
|
|
|
|
current_size = len(cache)
|
|
if current_size > max_size:
|
|
with lock:
|
|
size_violations.append(current_size)
|
|
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
|
|
futures = [executor.submit(random_operations, tid) for tid in range(10)]
|
|
concurrent.futures.wait(futures)
|
|
|
|
self.assertEqual(size_violations, [], f"Size exceeded max: {size_violations}")
|
|
self.assertLessEqual(len(cache), max_size)
|
|
|
|
|
|
class TestCacheClear(unittest.TestCase):
|
|
"""Tests for cache clear operation."""
|
|
|
|
def setUp(self):
|
|
self.cache = ThreadSafeLRUPromptCache(max_size=10)
|
|
|
|
def test_clear_removes_all_entries(self):
|
|
"""Clear should remove all entries."""
|
|
self.cache.insert_cache("model1", [1, 2], ["cache1"])
|
|
self.cache.insert_cache("model2", [3, 4], ["cache2"])
|
|
self.cache.insert_cache("model1", [5, 6], ["cache3"])
|
|
|
|
self.assertEqual(len(self.cache), 3)
|
|
|
|
self.cache.clear()
|
|
|
|
self.assertEqual(len(self.cache), 0)
|
|
|
|
def test_clear_allows_new_inserts(self):
|
|
"""After clear, new inserts should work normally."""
|
|
self.cache.insert_cache("model", [1], ["cache1"])
|
|
self.cache.clear()
|
|
self.cache.insert_cache("model", [2], ["cache2"])
|
|
|
|
self.assertEqual(len(self.cache), 1)
|
|
|
|
result, _ = self.cache.fetch_nearest_cache("model", [2])
|
|
self.assertEqual(result, ["cache2"])
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|