import itertools
import json
import logging
import os
import shutil
from io import TextIOWrapper
from typing import (
Any,
Dict,
Generator,
Iterable,
List,
Mapping,
Optional,
Sequence,
Tuple,
Union,
)
from uuid import uuid4
import numpy as np
import portalocker
from qdrant_client._pydantic_compat import to_dict
from qdrant_client.client_base import QdrantBase
from qdrant_client.conversions import common_types as types
from qdrant_client.http import models as rest_models
from qdrant_client.http.models.models import RecommendExample
from qdrant_client.local.local_collection import LocalCollection
META_INFO_FILENAME = "meta.json"
[docs]class QdrantLocal(QdrantBase):
"""
Everything Qdrant server can do, but locally.
Use this implementation to run vector search without running a Qdrant server.
Everything that works with local Qdrant will work with server Qdrant as well.
Use for small-scale data, demos, and tests.
If you need more speed or size, use Qdrant server.
"""
def __init__(self, location: str, force_disable_check_same_thread: bool = False) -> None:
"""
Initialize local Qdrant.
Args:
location: Where to store data. Can be a path to a directory or `:memory:` for in-memory storage.
force_disable_check_same_thread: Disable SQLite check_same_thread check. Use only if you know what you are doing.
"""
super().__init__()
self.force_disable_check_same_thread = force_disable_check_same_thread
self.location = location
self.persistent = location != ":memory:"
self.collections: Dict[str, LocalCollection] = {}
self.aliases: Dict[str, str] = {}
self._flock_file: Optional[TextIOWrapper] = None
self._load()
self._closed: bool = False
@property
def closed(self) -> bool:
return self._closed
[docs] def close(self, **kwargs: Any) -> None:
self._closed = True
for collection in self.collections.values():
if collection is not None:
collection.close()
else:
logging.warning(
f"Collection appears to be None before closing. The existing collections are: "
f"{list(self.collections.keys())}"
)
try:
if self._flock_file is not None and not self._flock_file.closed:
portalocker.unlock(self._flock_file)
self._flock_file.close()
except TypeError: # sometimes portalocker module can be garbage collected before
# QdrantLocal instance
pass
def _load(self) -> None:
if not self.persistent:
return
meta_path = os.path.join(self.location, META_INFO_FILENAME)
if not os.path.exists(meta_path):
os.makedirs(self.location, exist_ok=True)
with open(meta_path, "w") as f:
f.write(json.dumps({"collections": {}, "aliases": {}}))
else:
with open(meta_path, "r") as f:
meta = json.load(f)
for collection_name, config_json in meta["collections"].items():
config = rest_models.CreateCollection(**config_json)
collection_path = self._collection_path(collection_name)
self.collections[collection_name] = LocalCollection(
config,
collection_path,
force_disable_check_same_thread=self.force_disable_check_same_thread,
)
self.aliases = meta["aliases"]
lock_file_path = os.path.join(self.location, ".lock")
if not os.path.exists(lock_file_path):
os.makedirs(self.location, exist_ok=True)
with open(lock_file_path, "w") as f:
f.write("tmp lock file")
self._flock_file = open(lock_file_path, "r+")
try:
portalocker.lock(
self._flock_file,
portalocker.LockFlags.EXCLUSIVE | portalocker.LockFlags.NON_BLOCKING,
)
except portalocker.exceptions.LockException:
raise RuntimeError(
f"Storage folder {self.location} is already accessed by another instance of Qdrant client."
f" If you require concurrent access, use Qdrant server instead."
)
def _save(self) -> None:
if not self.persistent:
return
if self.closed:
raise RuntimeError("QdrantLocal instance is closed. Please create a new instance.")
meta_path = os.path.join(self.location, META_INFO_FILENAME)
with open(meta_path, "w") as f:
f.write(
json.dumps(
{
"collections": {
collection_name: to_dict(collection.config)
for collection_name, collection in self.collections.items()
},
"aliases": self.aliases,
}
)
)
def _get_collection(self, collection_name: str) -> LocalCollection:
if self.closed:
raise RuntimeError("QdrantLocal instance is closed. Please create a new instance.")
if collection_name in self.collections:
return self.collections[collection_name]
if collection_name in self.aliases:
return self.collections[self.aliases[collection_name]]
raise ValueError(f"Collection {collection_name} not found")
[docs] def search_batch(
self,
collection_name: str,
requests: Sequence[types.SearchRequest],
**kwargs: Any,
) -> List[List[types.ScoredPoint]]:
collection = self._get_collection(collection_name)
return [
collection.search(
query_vector=request.vector,
query_filter=request.filter,
limit=request.limit,
offset=request.offset,
with_payload=request.with_payload,
with_vectors=request.with_vector,
score_threshold=request.score_threshold,
)
for request in requests
]
[docs] def search(
self,
collection_name: str,
query_vector: Union[
types.NumpyArray,
Sequence[float],
Tuple[str, List[float]],
types.NamedVector,
types.NamedSparseVector,
],
query_filter: Optional[types.Filter] = None,
search_params: Optional[types.SearchParams] = None,
limit: int = 10,
offset: Optional[int] = None,
with_payload: Union[bool, Sequence[str], types.PayloadSelector] = True,
with_vectors: Union[bool, Sequence[str]] = False,
score_threshold: Optional[float] = None,
**kwargs: Any,
) -> List[types.ScoredPoint]:
collection = self._get_collection(collection_name)
return collection.search(
query_vector=query_vector,
query_filter=query_filter,
limit=limit,
offset=offset,
with_payload=with_payload,
with_vectors=with_vectors,
score_threshold=score_threshold,
)
[docs] def search_groups(
self,
collection_name: str,
query_vector: Union[
types.NumpyArray,
Sequence[float],
Tuple[str, List[float]],
types.NamedVector,
],
group_by: str,
query_filter: Optional[rest_models.Filter] = None,
search_params: Optional[rest_models.SearchParams] = None,
limit: int = 10,
group_size: int = 1,
with_payload: Union[bool, Sequence[str], rest_models.PayloadSelector] = True,
with_vectors: Union[bool, Sequence[str]] = False,
score_threshold: Optional[float] = None,
with_lookup: Optional[types.WithLookupInterface] = None,
**kwargs: Any,
) -> types.GroupsResult:
collection = self._get_collection(collection_name)
with_lookup_collection = None
if with_lookup is not None:
if isinstance(with_lookup, str):
with_lookup_collection = self._get_collection(with_lookup)
else:
with_lookup_collection = self._get_collection(with_lookup.collection)
return collection.search_groups(
query_vector=query_vector,
query_filter=query_filter,
limit=limit,
group_by=group_by,
group_size=group_size,
with_payload=with_payload,
with_vectors=with_vectors,
score_threshold=score_threshold,
with_lookup=with_lookup,
with_lookup_collection=with_lookup_collection,
)
[docs] def recommend_batch(
self,
collection_name: str,
requests: Sequence[types.RecommendRequest],
**kwargs: Any,
) -> List[List[types.ScoredPoint]]:
collection = self._get_collection(collection_name)
return [
collection.recommend(
positive=request.positive,
negative=request.negative,
query_filter=request.filter,
limit=request.limit,
offset=request.offset,
with_payload=request.with_payload,
with_vectors=request.with_vector,
score_threshold=request.score_threshold,
using=request.using,
lookup_from_collection=self._get_collection(request.lookup_from.collection)
if request.lookup_from
else None,
lookup_from_vector_name=request.lookup_from.vector
if request.lookup_from
else None,
strategy=request.strategy,
)
for request in requests
]
[docs] def recommend(
self,
collection_name: str,
positive: Optional[Sequence[RecommendExample]] = None,
negative: Optional[Sequence[RecommendExample]] = None,
query_filter: Optional[types.Filter] = None,
search_params: Optional[types.SearchParams] = None,
limit: int = 10,
offset: int = 0,
with_payload: Union[bool, List[str], types.PayloadSelector] = True,
with_vectors: Union[bool, List[str]] = False,
score_threshold: Optional[float] = None,
using: Optional[str] = None,
lookup_from: Optional[types.LookupLocation] = None,
strategy: Optional[types.RecommendStrategy] = None,
**kwargs: Any,
) -> List[types.ScoredPoint]:
collection = self._get_collection(collection_name)
return collection.recommend(
positive=positive,
negative=negative,
query_filter=query_filter,
limit=limit,
offset=offset,
with_payload=with_payload,
with_vectors=with_vectors,
score_threshold=score_threshold,
using=using,
lookup_from_collection=self._get_collection(lookup_from.collection)
if lookup_from
else None,
lookup_from_vector_name=lookup_from.vector if lookup_from else None,
strategy=strategy,
)
[docs] def recommend_groups(
self,
collection_name: str,
group_by: str,
positive: Optional[Sequence[Union[types.PointId, List[float]]]] = None,
negative: Optional[Sequence[Union[types.PointId, List[float]]]] = None,
query_filter: Optional[types.Filter] = None,
search_params: Optional[types.SearchParams] = None,
limit: int = 10,
group_size: int = 1,
score_threshold: Optional[float] = None,
with_payload: Union[bool, Sequence[str], types.PayloadSelector] = True,
with_vectors: Union[bool, Sequence[str]] = False,
using: Optional[str] = None,
lookup_from: Optional[types.LookupLocation] = None,
with_lookup: Optional[types.WithLookupInterface] = None,
strategy: Optional[types.RecommendStrategy] = None,
**kwargs: Any,
) -> types.GroupsResult:
collection = self._get_collection(collection_name)
with_lookup_collection = None
if with_lookup is not None:
if isinstance(with_lookup, str):
with_lookup_collection = self._get_collection(with_lookup)
else:
with_lookup_collection = self._get_collection(with_lookup.collection)
return collection.recommend_groups(
positive=positive,
negative=negative,
group_by=group_by,
group_size=group_size,
query_filter=query_filter,
limit=limit,
with_payload=with_payload,
with_vectors=with_vectors,
score_threshold=score_threshold,
using=using,
lookup_from_collection=self._get_collection(lookup_from.collection)
if lookup_from
else None,
lookup_from_vector_name=lookup_from.vector if lookup_from else None,
with_lookup=with_lookup,
with_lookup_collection=with_lookup_collection,
strategy=strategy,
)
[docs] def discover(
self,
collection_name: str,
target: Optional[types.TargetVector] = None,
context: Optional[Sequence[types.ContextExamplePair]] = None,
query_filter: Optional[types.Filter] = None,
search_params: Optional[types.SearchParams] = None,
limit: int = 10,
offset: int = 0,
with_payload: Union[bool, List[str], types.PayloadSelector] = True,
with_vectors: Union[bool, List[str]] = False,
using: Optional[str] = None,
lookup_from: Optional[types.LookupLocation] = None,
consistency: Optional[types.ReadConsistency] = None,
timeout: Optional[int] = None,
**kwargs: Any,
) -> List[types.ScoredPoint]:
collection = self._get_collection(collection_name)
return collection.discover(
target=target,
context=context,
query_filter=query_filter,
limit=limit,
offset=offset,
with_payload=with_payload,
with_vectors=with_vectors,
using=using,
lookup_from_collection=self._get_collection(lookup_from.collection)
if lookup_from
else None,
lookup_from_vector_name=lookup_from.vector if lookup_from else None,
)
[docs] def discover_batch(
self,
collection_name: str,
requests: Sequence[types.DiscoverRequest],
**kwargs: Any,
) -> List[List[types.ScoredPoint]]:
collection = self._get_collection(collection_name)
return [
collection.discover(
target=request.target,
context=request.context,
query_filter=request.filter,
limit=request.limit,
offset=request.offset,
with_payload=request.with_payload,
with_vectors=request.with_vector,
using=request.using,
lookup_from_collection=self._get_collection(request.lookup_from.collection)
if request.lookup_from
else None,
lookup_from_vector_name=request.lookup_from.vector
if request.lookup_from
else None,
)
for request in requests
]
[docs] def count(
self,
collection_name: str,
count_filter: Optional[types.Filter] = None,
exact: bool = True,
**kwargs: Any,
) -> types.CountResult:
collection = self._get_collection(collection_name)
return collection.count(count_filter=count_filter)
[docs] def upsert(
self, collection_name: str, points: types.Points, **kwargs: Any
) -> types.UpdateResult:
collection = self._get_collection(collection_name)
collection.upsert(points)
return self._default_update_result()
[docs] def update_vectors(
self,
collection_name: str,
points: Sequence[types.PointVectors],
**kwargs: Any,
) -> types.UpdateResult:
collection = self._get_collection(collection_name)
collection.update_vectors(points)
return self._default_update_result()
[docs] def delete_vectors(
self,
collection_name: str,
vectors: Sequence[str],
points: types.PointsSelector,
**kwargs: Any,
) -> types.UpdateResult:
collection = self._get_collection(collection_name)
collection.delete_vectors(vectors, points)
return self._default_update_result()
[docs] def retrieve(
self,
collection_name: str,
ids: Sequence[types.PointId],
with_payload: Union[bool, Sequence[str], types.PayloadSelector] = True,
with_vectors: Union[bool, Sequence[str]] = False,
**kwargs: Any,
) -> List[types.Record]:
collection = self._get_collection(collection_name)
return collection.retrieve(ids, with_payload, with_vectors)
@classmethod
def _default_update_result(cls, operation_id: int = 0) -> types.UpdateResult:
return types.UpdateResult(
operation_id=operation_id,
status=rest_models.UpdateStatus.COMPLETED,
)
[docs] def delete(
self, collection_name: str, points_selector: types.PointsSelector, **kwargs: Any
) -> types.UpdateResult:
collection = self._get_collection(collection_name)
collection.delete(points_selector)
return self._default_update_result()
[docs] def set_payload(
self,
collection_name: str,
payload: types.Payload,
points: types.PointsSelector,
key: Optional[str] = None,
**kwargs: Any,
) -> types.UpdateResult:
collection = self._get_collection(collection_name)
collection.set_payload(payload=payload, selector=points, key=key)
return self._default_update_result()
[docs] def overwrite_payload(
self,
collection_name: str,
payload: types.Payload,
points: types.PointsSelector,
**kwargs: Any,
) -> types.UpdateResult:
collection = self._get_collection(collection_name)
collection.overwrite_payload(payload=payload, selector=points)
return self._default_update_result()
[docs] def delete_payload(
self,
collection_name: str,
keys: Sequence[str],
points: types.PointsSelector,
**kwargs: Any,
) -> types.UpdateResult:
collection = self._get_collection(collection_name)
collection.delete_payload(keys=keys, selector=points)
return self._default_update_result()
[docs] def clear_payload(
self, collection_name: str, points_selector: types.PointsSelector, **kwargs: Any
) -> types.UpdateResult:
collection = self._get_collection(collection_name)
collection.clear_payload(selector=points_selector)
return self._default_update_result()
[docs] def batch_update_points(
self,
collection_name: str,
update_operations: Sequence[types.UpdateOperation],
**kwargs: Any,
) -> List[types.UpdateResult]:
collection = self._get_collection(collection_name)
collection.batch_update_points(update_operations)
return [self._default_update_result()] * len(update_operations)
[docs] def update_collection_aliases(
self, change_aliases_operations: Sequence[types.AliasOperations], **kwargs: Any
) -> bool:
for operation in change_aliases_operations:
if isinstance(operation, rest_models.CreateAliasOperation):
self._get_collection(operation.create_alias.collection_name)
self.aliases[operation.create_alias.alias_name] = (
operation.create_alias.collection_name
)
elif isinstance(operation, rest_models.DeleteAliasOperation):
self.aliases.pop(operation.delete_alias.alias_name, None)
elif isinstance(operation, rest_models.RenameAliasOperation):
new_name = operation.rename_alias.new_alias_name
old_name = operation.rename_alias.old_alias_name
self.aliases[new_name] = self.aliases.pop(old_name)
else:
raise ValueError(f"Unknown operation: {operation}")
self._save()
return True
[docs] def get_collection_aliases(
self, collection_name: str, **kwargs: Any
) -> types.CollectionsAliasesResponse:
if self.closed:
raise RuntimeError("QdrantLocal instance is closed. Please create a new instance.")
return types.CollectionsAliasesResponse(
aliases=[
rest_models.AliasDescription(
alias_name=alias_name,
collection_name=name,
)
for alias_name, name in self.aliases.items()
if name == collection_name
]
)
[docs] def get_aliases(self, **kwargs: Any) -> types.CollectionsAliasesResponse:
if self.closed:
raise RuntimeError("QdrantLocal instance is closed. Please create a new instance.")
return types.CollectionsAliasesResponse(
aliases=[
rest_models.AliasDescription(
alias_name=alias_name,
collection_name=name,
)
for alias_name, name in self.aliases.items()
]
)
[docs] def get_collections(self, **kwargs: Any) -> types.CollectionsResponse:
if self.closed:
raise RuntimeError("QdrantLocal instance is closed. Please create a new instance.")
return types.CollectionsResponse(
collections=[
rest_models.CollectionDescription(name=name)
for name, _ in self.collections.items()
]
)
[docs] def get_collection(self, collection_name: str, **kwargs: Any) -> types.CollectionInfo:
collection = self._get_collection(collection_name)
return collection.info()
[docs] def collection_exists(self, collection_name: str, **kwargs: Any) -> bool:
try:
self._get_collection(collection_name)
return True
except ValueError:
return False
[docs] def update_collection(
self,
collection_name: str,
sparse_vectors_config: Optional[Mapping[str, types.SparseVectorParams]] = None,
**kwargs: Any,
) -> bool:
_collection = self._get_collection(collection_name)
if sparse_vectors_config is not None:
for vector_name, vector_params in sparse_vectors_config.items():
_collection.update_sparse_vectors_config(vector_name, vector_params)
return True
return False
def _collection_path(self, collection_name: str) -> Optional[str]:
if self.persistent:
return os.path.join(self.location, "collection", collection_name)
else:
return None
[docs] def delete_collection(self, collection_name: str, **kwargs: Any) -> bool:
if self.closed:
raise RuntimeError("QdrantLocal instance is closed. Please create a new instance.")
_collection = self.collections.pop(collection_name, None)
del _collection
self.aliases = {
alias_name: name
for alias_name, name in self.aliases.items()
if name != collection_name
}
collection_path = self._collection_path(collection_name)
if collection_path is not None:
shutil.rmtree(collection_path, ignore_errors=True)
self._save()
return True
[docs] def create_collection(
self,
collection_name: str,
vectors_config: Union[types.VectorParams, Mapping[str, types.VectorParams]],
init_from: Optional[types.InitFrom] = None,
sparse_vectors_config: Optional[Mapping[str, types.SparseVectorParams]] = None,
**kwargs: Any,
) -> bool:
if self.closed:
raise RuntimeError("QdrantLocal instance is closed. Please create a new instance.")
src_collection = None
from_collection_name = None
if init_from is not None:
from_collection_name = (
init_from if isinstance(init_from, str) else init_from.collection
)
src_collection = self._get_collection(from_collection_name)
if collection_name in self.collections:
raise ValueError(f"Collection {collection_name} already exists")
collection_path = self._collection_path(collection_name)
if collection_path is not None:
os.makedirs(collection_path, exist_ok=True)
collection = LocalCollection(
rest_models.CreateCollection(
vectors=vectors_config,
sparse_vectors=sparse_vectors_config,
),
location=collection_path,
force_disable_check_same_thread=self.force_disable_check_same_thread,
)
self.collections[collection_name] = collection
if src_collection and from_collection_name:
batch_size = 100
records, next_offset = self.scroll(from_collection_name, limit=2, with_vectors=True)
self.upload_records(
collection_name, records
) # it is not crucial to replace upload_records here
# since it is an internal usage, and we don't have custom shard keys in qdrant local
while next_offset is not None:
records, next_offset = self.scroll(
from_collection_name, offset=next_offset, limit=batch_size, with_vectors=True
)
self.upload_records(collection_name, records)
self._save()
return True
[docs] def recreate_collection(
self,
collection_name: str,
vectors_config: Union[types.VectorParams, Mapping[str, types.VectorParams]],
init_from: Optional[types.InitFrom] = None,
sparse_vectors_config: Optional[Mapping[str, types.SparseVectorParams]] = None,
**kwargs: Any,
) -> bool:
self.delete_collection(collection_name)
return self.create_collection(
collection_name, vectors_config, init_from, sparse_vectors_config
)
[docs] def upload_points(
self, collection_name: str, points: Iterable[types.PointStruct], **kwargs: Any
) -> None:
self._upload_points(collection_name, points)
[docs] def upload_records(
self, collection_name: str, records: Iterable[types.Record], **kwargs: Any
) -> None:
# upload_records in local mode behaves like upload_records with wait=True in server mode
self._upload_points(collection_name, records)
def _upload_points(
self,
collection_name: str,
points: Iterable[Union[types.PointStruct, types.Record]],
) -> None:
collection = self._get_collection(collection_name)
collection.upsert(
[
rest_models.PointStruct(
id=point.id,
vector=point.vector or {},
payload=point.payload or {},
)
for point in points
]
)
[docs] def upload_collection(
self,
collection_name: str,
vectors: Union[
Dict[str, types.NumpyArray], types.NumpyArray, Iterable[types.VectorStruct]
],
payload: Optional[Iterable[Dict[Any, Any]]] = None,
ids: Optional[Iterable[types.PointId]] = None,
**kwargs: Any,
) -> None:
# upload_collection in local mode behaves like upload_collection with wait=True in server mode
def uuid_generator() -> Generator[str, None, None]:
while True:
yield str(uuid4())
collection = self._get_collection(collection_name)
if isinstance(vectors, dict) and any(isinstance(v, np.ndarray) for v in vectors.values()):
assert (
len(set([arr.shape[0] for arr in vectors.values()])) == 1
), "Each named vector should have the same number of vectors"
num_vectors = next(iter(vectors.values())).shape[0]
# convert Dict[str, np.ndarray] to List[Dict[str, List[float]]]
vectors = [
{name: vectors[name][i].tolist() for name in vectors.keys()}
for i in range(num_vectors)
]
collection.upsert(
[
rest_models.PointStruct(
id=point_id,
vector=(vector.tolist() if isinstance(vector, np.ndarray) else vector) or {},
payload=payload or {},
)
for (point_id, vector, payload) in zip(
ids or uuid_generator(),
iter(vectors),
payload or itertools.cycle([{}]),
)
]
)
[docs] def create_payload_index(
self,
collection_name: str,
field_name: str,
field_schema: Optional[types.PayloadSchemaType] = None,
field_type: Optional[types.PayloadSchemaType] = None,
**kwargs: Any,
) -> types.UpdateResult:
logging.warning(
"Payload indexes have no effect in the local Qdrant. Please use server Qdrant if you need payload indexes."
)
return self._default_update_result()
[docs] def delete_payload_index(
self, collection_name: str, field_name: str, **kwargs: Any
) -> types.UpdateResult:
logging.warning(
"Payload indexes have no effect in the local Qdrant. Please use server Qdrant if you need payload indexes."
)
return self._default_update_result()
[docs] def list_snapshots(
self, collection_name: str, **kwargs: Any
) -> List[types.SnapshotDescription]:
return []
[docs] def create_snapshot(
self, collection_name: str, **kwargs: Any
) -> Optional[types.SnapshotDescription]:
raise NotImplementedError(
"Snapshots are not supported in the local Qdrant. Please use server Qdrant if you need full snapshots."
)
[docs] def delete_snapshot(self, collection_name: str, snapshot_name: str, **kwargs: Any) -> bool:
raise NotImplementedError(
"Snapshots are not supported in the local Qdrant. Please use server Qdrant if you need full snapshots."
)
[docs] def list_full_snapshots(self, **kwargs: Any) -> List[types.SnapshotDescription]:
return []
[docs] def create_full_snapshot(self, **kwargs: Any) -> types.SnapshotDescription:
raise NotImplementedError(
"Snapshots are not supported in the local Qdrant. Please use server Qdrant if you need full snapshots."
)
[docs] def delete_full_snapshot(self, snapshot_name: str, **kwargs: Any) -> bool:
raise NotImplementedError(
"Snapshots are not supported in the local Qdrant. Please use server Qdrant if you need full snapshots."
)
[docs] def recover_snapshot(self, collection_name: str, location: str, **kwargs: Any) -> bool:
raise NotImplementedError(
"Snapshots are not supported in the local Qdrant. Please use server Qdrant if you need full snapshots."
)
[docs] def list_shard_snapshots(
self, collection_name: str, shard_id: int, **kwargs: Any
) -> List[types.SnapshotDescription]:
return []
[docs] def create_shard_snapshot(
self, collection_name: str, shard_id: int, **kwargs: Any
) -> Optional[types.SnapshotDescription]:
raise NotImplementedError(
"Snapshots are not supported in the local Qdrant. Please use server Qdrant if you need snapshots."
)
[docs] def delete_shard_snapshot(
self, collection_name: str, shard_id: int, snapshot_name: str, **kwargs: Any
) -> bool:
raise NotImplementedError(
"Snapshots are not supported in the local Qdrant. Please use server Qdrant if you need snapshots."
)
[docs] def recover_shard_snapshot(
self,
collection_name: str,
shard_id: int,
location: str,
**kwargs: Any,
) -> bool:
raise NotImplementedError(
"Snapshots are not supported in the local Qdrant. Please use server Qdrant if you need snapshots."
)
[docs] def lock_storage(self, reason: str, **kwargs: Any) -> types.LocksOption:
raise NotImplementedError(
"Locks are not supported in the local Qdrant. Please use server Qdrant if you need full snapshots."
)
[docs] def unlock_storage(self, **kwargs: Any) -> types.LocksOption:
raise NotImplementedError(
"Locks are not supported in the local Qdrant. Please use server Qdrant if you need full snapshots."
)
[docs] def get_locks(self, **kwargs: Any) -> types.LocksOption:
return types.LocksOption(
error_message=None,
write=False,
)
[docs] def create_shard_key(
self,
collection_name: str,
shard_key: types.ShardKey,
shards_number: Optional[int] = None,
replication_factor: Optional[int] = None,
placement: Optional[List[int]] = None,
**kwargs: Any,
) -> bool:
raise NotImplementedError(
"Sharding is not supported in the local Qdrant. Please use server Qdrant if you need sharding."
)
[docs] def delete_shard_key(
self,
collection_name: str,
shard_key: types.ShardKey,
**kwargs: Any,
) -> bool:
raise NotImplementedError(
"Sharding is not supported in the local Qdrant. Please use server Qdrant if you need sharding."
)