@@ 5,7 5,7 @@ import json
import re
import sqlite3
import sys
-
+from typing import Any, Dict, List, Optional, Union
ASCENDING = False
DESCENDING = True
@@ 21,8 21,8 @@ class MalformedDocument(Exception):
class Connection:
"""
- The high-level connection to a sqlite database. Creating a connection accepts
- the same args and keyword args as the ``sqlite3.connect`` method
+ The high-level connection to a sqlite database. Creating a connection
+ accepts the same args and keyword args as the ``sqlite3.connect`` method
"""
def __init__(self, *args, **kwargs):
@@ 31,12 31,12 @@ class Connection:
def connect(self, *args, **kwargs):
"""
- Connect to a sqlite database only if no connection exists. Isolation level
- for the connection is automatically set to autocommit
+ Connect to a sqlite database only if no connection exists. Isolation
+ level for the connection is automatically set to autocommit
"""
self.db = sqlite3.connect(*args, **kwargs)
self.db.isolation_level = None
- self.db.execute("PRAGMA journal_mode=WAL") # Set the journal mode to WAL
+ self.db.execute("PRAGMA journal_mode=WAL") # Set WAL journal mode
def close(self):
"""
@@ 47,31 47,32 @@ class Connection:
self.db.commit()
self.db.close()
- def __getitem__(self, name):
+ def __getitem__(self, name: str) -> "Collection":
"""
- A pymongo-like behavior for dynamically obtaining a collection of documents
+ A pymongo-like behavior for dynamically obtaining a collection of
+ documents
"""
if name not in self._collections:
self._collections[name] = Collection(self.db, name)
return self._collections[name]
- def __getattr__(self, name):
+ def __getattr__(self, name: str) -> Any:
if name in self.__dict__:
return self.__dict__[name]
return self[name]
- def __enter__(self):
+ def __enter__(self) -> "Connection":
return self
- def __exit__(self, exc_type, exc_val, exc_traceback):
+ def __exit__(self, exc_type, exc_val, exc_traceback) -> bool:
self.close()
return False
- def drop_collection(self, name):
+ def drop_collection(self, name: str):
"""
Drops a collection permanently if it exists
"""
- self.db.execute("drop table if exists %s" % name)
+ self.db.execute(f"DROP TABLE IF EXISTS {name}")
class Collection:
@@ 79,7 80,7 @@ class Collection:
A virtual database table that holds JSON-type documents
"""
- def __init__(self, db, name, create=True):
+ def __init__(self, db: sqlite3.Connection, name: str, create: bool = True):
self.db = db
self.name = name
@@ 88,7 89,7 @@ class Collection:
def begin(self):
if not self.db.in_transaction:
- self.db.execute('begin')
+ self.db.execute("BEGIN")
def commit(self):
if self.db.in_transaction:
@@ 102,18 103,18 @@ class Collection:
"""
Clears all stored documents in this database. THERE IS NO GOING BACK
"""
- self.db.execute("delete from %s" % self.name)
+ self.db.execute(f"DELETE FROM {self.name}")
- def exists(self):
+ def exists(self) -> bool:
"""
Checks if this collection exists
"""
- return self._object_exists('table', self.name)
+ return self._object_exists("table", self.name)
- def _object_exists(self, type, name):
+ def _object_exists(self, type: str, name: str) -> bool:
row = self.db.execute(
- "select count(1) from sqlite_master where type = ? and name = ?",
- (type, name.strip('[]'))
+ "SELECT COUNT(1) FROM sqlite_master WHERE type = ? AND name = ?",
+ (type, name.strip("[]")),
).fetchone()
return int(row[0]) > 0
@@ 122,47 123,58 @@ class Collection:
"""
Creates the collections database only if it does not already exist
"""
- self.db.execute("""
- create table if not exists %s (
- id integer primary key autoincrement,
- data text not null
- )
- """ % self.name)
+ self.db.execute(
+ f"""
+ CREATE TABLE IF NOT EXISTS {self.name} (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ data TEXT NOT NULL
+ )"""
+ )
- def insert(self, document):
+ def insert(self, document: Dict[str, Any]) -> Dict[str, Any]:
"""
- Inserts a document into this collection. If a document already has an '_id'
- value it will be updated
+ Inserts a document into this collection. If a document already has an
+ '_id' value it will be updated
:returns: inserted document with id
"""
- if '_id' in document:
+ if "_id" in document:
return self.save(document)
# Check if document is a dict
if not isinstance(document, dict):
- raise MalformedDocument('document must be a dictionary, not a %s' % type(document))
+ raise MalformedDocument(
+ f"document must be a dictionary, not a {type(document)}"
+ )
# Create it and return a modified one with the id
- cursor = self.db.execute("""
- insert into %s(data) values (?)
- """ % self.name, (json.dumps(document),))
+ cursor = self.db.execute(
+ f"INSERT INTO {self.name}(data) VALUES (?)",
+ (json.dumps(document),),
+ )
- document['_id'] = cursor.lastrowid
+ document["_id"] = cursor.lastrowid
try:
- [self.reindex(table=index, documents=[document])
- for index in self.list_indexes()]
+ [
+ self.reindex(table=index, documents=[document])
+ for index in self.list_indexes()
+ ]
except sqlite3.IntegrityError as ie:
- self.delete_one({'_id': document['_id']})
+ self.delete_one({"_id": document["_id"]})
raise ie
return document
- def update(self, spec, document, upsert=False, hint=None):
+ def update(
+ self,
+ spec: Dict[str, Any],
+ document: Dict[str, Any],
+ upsert: bool = False,
+ hint: Optional[str] = None,
+ ) -> Optional[Dict[str, Any]]:
"""
DEPRECATED in pymongo
Updates a document stored in this collection.
"""
- # spec = {'key': 'value'}
to_update = self.find(query=spec, skip=0, limit=1, hint=hint)
if to_update:
to_update = to_update[0]
@@ 171,43 183,53 @@ class Collection:
return self.insert(document)
return None
- _id = to_update['_id']
+ _id = to_update["_id"]
+
+ self.db.execute(
+ f"UPDATE {self.name} SET data = ? WHERE id = ?",
+ (json.dumps(document), _id),
+ )
- self.db.execute("""
- update %s set data = ? where id = ?
- """ % self.name, (json.dumps(document), _id))
-
- document['_id'] = _id
+ document["_id"] = _id
try:
- [self.reindex(table=index, documents=[document])
- for index in self.list_indexes()]
+ [
+ self.reindex(table=index, documents=[document])
+ for index in self.list_indexes()
+ ]
except sqlite3.IntegrityError as ie:
self.save(to_update)
raise ie
return document
- def _remove(self, document):
+ def _remove(self, document: Dict[str, Any]):
"""
- Removes a document from this collection. This will raise AssertionError if the
- document does not have an _id attribute
+ Removes a document from this collection. This will raise AssertionError
+ if the document does not have an _id attribute
"""
- assert '_id' in document, 'Document must have an id'
- self.db.execute("delete from %s where id = ?" % self.name, (document['_id'],))
+ assert "_id" in document, "Document must have an id"
+ self.db.execute(
+ f"DELETE FROM {self.name} WHERE id = ?",
+ (document["_id"],)
+ )
- def save(self, document):
+ def save(self, document: Dict[str, Any]) -> Dict[str, Any]:
"""
Alias for ``update`` with upsert=True
"""
- return self.update({'_id': document.pop('_id', None)}, document, upsert=True)
+ return self.update(
+ {"_id": document.pop("_id", None)}, document, upsert=True
+ )
- def delete(self, document):
+ def delete(self, document: Dict[str, Any]):
"""
DEPRECATED
Alias for ``remove``
"""
return self._remove(document)
- def delete_one(self, filter, hint=None):
+ def delete_one(
+ self, filter: Dict[str, Any], hint: Optional[str] = None
+ ) -> Optional[Dict[str, Any]]:
"""
Delete only the first document according the filter
Params:
@@ 219,23 241,30 @@ class Collection:
return None
return self._remove(document)
- def _load(self, id, data):
+ def _load(self, id: int, data: Union[str, bytes]) -> Dict[str, Any]:
"""
Loads a JSON document taking care to apply the document id
"""
if isinstance(data, bytes): # pragma: no cover Python >= 3.0
- data = data.decode('utf-8')
+ data = data.decode("utf-8")
document = json.loads(data)
- document['_id'] = id
+ document["_id"] = id
return document
- def __get_val(self, item, key):
- for k in key.split('.'):
+ def __get_val(self, item: Dict[str, Any], key: str) -> Any:
+ for k in key.split("."):
item = item.get(k)
return item
- def find(self, query=None, skip=None, limit=None, hint=None, sort=None):
+ def find(
+ self,
+ query: Optional[Dict[str, Any]] = None,
+ skip: Optional[int] = None,
+ limit: Optional[int] = None,
+ hint: Optional[str] = None,
+ sort: Optional[Dict[str, bool]] = None,
+ ) -> List[Dict[str, Any]]:
"""
Returns a list of documents in this collection that match a given query
"""
@@ 244,26 273,30 @@ class Collection:
if skip is None:
skip = 0
- index_name = ''
- where = ''
+ index_name = ""
+ where = ""
if hint:
keys = self.__table_name_as_keys(hint)
index_name = hint
else:
- keys = [key.replace('.', '_')
- for key in query
- if not key.startswith('$')]
+ keys = [
+ key.replace(".", "_")
+ for key in query
+ if not key.startswith("$")
+ ]
if keys:
- index_name = '[%s{%s}]' % (self.name, ','.join(keys))
+ index_name = f'[{self.name}{{{",".join(keys)}}}]'
if index_name in self.list_indexes():
- index_query = ' and '.join(
- ["%s='%s'" % (key, json.dumps(query[key.replace('_', '.')]))
- for key in keys]
+ index_query = " AND ".join(
+ [
+ f"{key}='{json.dumps(query[key.replace('_', '.')])}'"
+ for key in keys
+ ]
)
- where = 'where id in (select id from %s where %s)' % (
- index_name, index_query
+ where = (
+ f"WHERE id IN (SELECT id FROM {index_name} WHERE {index_query})"
)
- cmd = "select id, data from %s %s" % (self.name, where)
+ cmd = f"SELECT id, data FROM {self.name} {where}"
cursor = self.db.execute(cmd)
apply = partial(self._apply_query, query)
@@ 286,17 319,20 @@ class Collection:
return results[:limit] if isinstance(limit, int) else results
- def _apply_query(self, query, document):
+ def _apply_query(
+ self, query: Dict[str, Any], document: Dict[str, Any]
+ ) -> bool:
"""
- Applies a query to a document. Returns True if the document meets the criteria of
- the supplied query. The ``query`` argument generally follows mongodb style syntax
- and consists of the following logical checks and operators.
+ Applies a query to a document. Returns True if the document meets the
+ criteria of the supplied query. The ``query`` argument generally
+ follows mongodb style syntax and consists of the following logical
+ checks and operators.
Logical: $and, $or, $nor, $not
Operators: $eq, $ne, $gt, $gte, $lt, $lte, $mod, $in, $nin, $all
- If no logical operator is supplied, it assumed that all field checks must pass. For
- example, these are equivalent:
+ If no logical operator is supplied, it assumed that all field checks
+ must pass. For example, these are equivalent:
{'foo': 'bar', 'baz': 'qux'}
{'$and': [{'foo': 'bar'}, {'baz': 'qux'}]}
@@ 322,28 358,30 @@ class Collection:
]
}
- In the previous example, this will return any document where the 'bar' key is equal
- to 'baz' and either the 'foo' key is an even number between 0 and 10 or is an odd number
- greater than 10.
+ In the previous example, this will return any document where the 'bar'
+ key is equal to 'baz' and either the 'foo' key is an even number
+ between 0 and 10 or is an odd number greater than 10.
"""
matches = [] # A list of booleans
reapply = lambda q: self._apply_query(q, document)
- for field, value in list(query.items()):
+ for field, value in query.items():
# A more complex query type $and, $or, etc
- if field == '$and':
+ if field == "$and":
matches.append(all(map(reapply, value)))
- elif field == '$or':
+ elif field == "$or":
matches.append(any(map(reapply, value)))
- elif field == '$nor':
+ elif field == "$nor":
matches.append(not any(map(reapply, value)))
- elif field == '$not':
+ elif field == "$not":
matches.append(not self._apply_query(value, document))
# Invoke a query operator
elif isinstance(value, dict):
- for operator, arg in list(value.items()):
- if not self._get_operator_fn(operator)(field, arg, document):
+ for operator, arg in value.items():
+ if not self._get_operator_fn(operator)(
+ field, arg, document
+ ):
matches.append(False)
break
else:
@@ 352,8 390,8 @@ class Collection:
# Standard
elif value != document.get(field, None):
# check if field contains a dot
- if '.' in field:
- nodes = field.split('.')
+ if "." in field:
+ nodes = field.split(".")
document_section = document
try:
@@ 372,23 410,32 @@ class Collection:
return all(matches)
- def _get_operator_fn(self, op):
+ def _get_operator_fn(self, op: str) -> Any:
"""
- Returns the function in this module that corresponds to an operator string.
- This simply checks if there is a method that handles the operator defined
- in this module, replacing '$' with '_' (i.e. if this module has a _gt
- method for $gt) and returns it. If no match is found, or the operator does not
- start with '$', a MalformedQueryException is raised
+ Returns the function in this module that corresponds to an operator
+ string. This simply checks if there is a method that handles the
+ operator defined in this module, replacing '$' with '_' (i.e. if this
+ module has a _gt method for $gt) and returns it. If no match is found,
+ or the operator does not start with '$', a MalformedQueryException is
+ raised.
"""
- if not op.startswith('$'):
- raise MalformedQueryException("Operator '%s' is not a valid query operation" % op)
+ if not op.startswith("$"):
+ raise MalformedQueryException(
+ f"Operator '{op}' is not a valid query operation"
+ )
try:
- return getattr(sys.modules[__name__], op.replace('$', '_'))
+ return getattr(sys.modules[__name__], op.replace("$", "_"))
except AttributeError:
- raise MalformedQueryException("Operator '%s' is not currently implemented" % op)
+ raise MalformedQueryException(
+ f"Operator '{op}' is not currently implemented"
+ )
- def find_one(self, query=None, hint=None):
+ def find_one(
+ self,
+ query: Optional[Dict[str, Any]] = None,
+ hint: Optional[str] = None,
+ ) -> Optional[Dict[str, Any]]:
"""
Equivalent to ``find(query, limit=1)[0]``
"""
@@ 397,9 444,15 @@ class Collection:
except (sqlite3.OperationalError, IndexError):
return None
- def find_and_modify(self, query=None, update=None, hint=None):
+ def find_and_modify(
+ self,
+ query: Optional[Dict[str, Any]] = None,
+ update: Optional[Dict[str, Any]] = None,
+ hint: Optional[str] = None,
+ ):
"""
- Finds documents in this collection that match a given query and updates them
+ Finds documents in this collection that match a given query and updates
+ them
"""
update = update or {}
@@ 407,71 460,79 @@ class Collection:
document.update(update)
self.save(document)
- def count(self, query=None, hint=None):
+ def count(
+ self,
+ query: Optional[Dict[str, Any]] = None,
+ hint: Optional[str] = None,
+ ) -> int:
"""
Equivalent to ``len(find(query))``
"""
return len(self.find(query=query, hint=hint))
- def rename(self, new_name):
+ def rename(self, new_name: str):
"""
Rename this collection
"""
new_collection = Collection(self.db, new_name, create=False)
assert not new_collection.exists()
- self.db.execute("alter table %s rename to %s" % (self.name, new_name))
+ self.db.execute(f"ALTER TABLE {self.name} RENAME TO {new_name}")
self.name = new_name
- def distinct(self, key):
+ def distinct(self, key: str) -> set:
"""
Get a set of distinct values for the given key excluding an implicit
None for documents that do not contain the key
"""
- return set(d[key] for d in [d for d in self.find() if key in d])
+ return {d[key] for d in self.find() if key in d}
- def create_index(self, key, reindex=True, sparse=False, unique=False):
+ def create_index(
+ self,
+ key: Union[str, List[str]],
+ reindex: bool = True,
+ sparse: bool = False,
+ unique: bool = False,
+ ):
"""
- Creates an index if it does not exist then performs a full reindex for this collection
+ Creates an index if it does not exist then performs a full reindex for
+ this collection
"""
if isinstance(key, (list, tuple)):
- index_name = ','.join(key)
- index_columns = ', '.join('%s text' % f for f in key)
+ index_name = ",".join(key)
+ index_columns = ", ".join(f"{f} text" for f in key)
else:
index_name = key
- index_columns = '%s text' % key
+ index_columns = f"{key} text"
# Allow dot notation, but save it as underscore
- index_name = index_name.replace('.', '_')
- index_columns = index_columns.replace('.', '_')
+ index_name = index_name.replace(".", "_")
+ index_columns = index_columns.replace(".", "_")
- table_name = '[%s{%s}]' % (self.name, index_name)
- reindex = reindex or not self._object_exists('table', table_name)
+ table_name = f"[{self.name}{{{index_name}}}]"
+ reindex = reindex or not self._object_exists("table", table_name)
# Create a table store for the index data
- self.db.execute("""
- create table if not exists {table} (
- id integer primary key,
- {columns},
- foreign key(id) references {collection}(id) on delete cascade on update cascade
+ self.db.execute(
+ f"""
+ CREATE TABLE IF NOT EXISTS {table_name} (
+ id INTEGER PRIMARY KEY,
+ {index_columns},
+ FOREIGN KEY(id) REFERENCES {self.name}(id)
+ ON DELETE CASCADE
+ ON UPDATE CASCADE
)
- """.format(
- table=table_name,
- collection=self.name,
- columns=index_columns
- ))
+ """
+ )
# Create the index
- self.db.execute("""
- create {unique} index
- if not exists [idx.{collection}{{index}}]
- on {table}({index})
- """.format(
- unique='unique' if unique else '',
- collection=self.name,
- index=index_name,
- table=table_name,
- ))
+ self.db.execute(
+ f"""
+ CREATE {'UNIQUE ' if unique else ''}INDEX
+ IF NOT EXISTS [idx.{self.name}{{{index_name}}}]
+ ON {table_name}({index_name})
+ """
+ )
if reindex:
try:
@@ 480,36 541,42 @@ class Collection:
self.drop_index(table_name)
raise ie
- def ensure_index(self, key, sparse=False):
+ def ensure_index(self, key: Union[str, List[str]], sparse: bool = False):
"""
Equivalent to ``create_index(key, reindex=False)``
"""
self.create_index(key, reindex=False, sparse=False)
- def __table_name_as_keys(self, table):
- return re.findall(r'^\[.*\{(.*)\}\]$', table)[0].split(',')
+ def __table_name_as_keys(self, table: str) -> List[str]:
+ return re.findall(r"^\[.*\{(.*)\}\]$", table)[0].split(",")
- def reindex(self, table, sparse=False, documents=None):
+ def reindex(
+ self,
+ table: str,
+ sparse: bool = False,
+ documents: Optional[List[Dict[str, Any]]] = None,
+ ):
index = self.__table_name_as_keys(table)
- update = "update {table} set {key} = ? where id = ?"
- insert = "insert into {table}({index},id) values({q},{_id})"
- delete = "delete from {table} where id = {_id}"
- count = "select count(1) from {table} where id = ?"
- qs = ('?,' * len(index)).rstrip(',')
+ update = "UPDATE {table} SET {key} = ? WHERE id = ?"
+ insert = "INSERT INTO {table}({index},id) VALUES({q},{_id})"
+ delete = "DELETE FROM {table} WHERE id = {_id}"
+ count = "SELECT COUNT(1) FROM {table} WHERE id = ?"
+ qs = ("?," * len(index)).rstrip(",")
- for document in (documents or self.find()):
- _id = document['_id']
+ for document in documents or self.find():
+ _id = document["_id"]
# Ensure there's a row before we update
row = self.db.execute(count.format(table=table), (_id,)).fetchone()
if int(row[0]) == 0:
self.db.execute(
insert.format(
- table=table, index=','.join(index), q=qs, _id=_id
- ), [None for x in index]
+ table=table, index=",".join(index), q=qs, _id=_id
+ ),
+ [None for _ in index],
)
for key in index:
doc = deepcopy(document)
- for k in key.split('_'):
+ for k in key.split("_"):
if isinstance(doc, dict):
# Ignore this document if it doesn't have the key
if k not in doc and sparse:
@@ 517,28 584,29 @@ class Collection:
doc = doc.get(k, None)
try:
self.db.execute(
- update.format(
- table=table, key=key
- ), (json.dumps(doc), _id)
+ update.format(table=table, key=key),
+ (json.dumps(doc), _id),
)
except sqlite3.IntegrityError as ie:
self.db.execute(delete.format(table=table, _id=_id))
raise ie
- def list_indexes(self, as_keys=False):
- cmd = ("SELECT name FROM sqlite_master "
- "WHERE type='table' and name like '{name}{{%}}'")
+ def list_indexes(self, as_keys: bool = False) -> List[str]:
+ cmd = (
+ "SELECT name FROM sqlite_master "
+ "WHERE type='table' AND name LIKE '{name}{{%}}'"
+ )
if as_keys:
return [
- self.__table_name_as_keys('[{index}]'.format(index=t[0]))
+ self.__table_name_as_keys("[{index}]".format(index=t[0]))
for t in self.db.execute(cmd.format(name=self.name)).fetchall()
]
return [
- '[{index}]'.format(index=t[0])
+ "[{index}]".format(index=t[0])
for t in self.db.execute(cmd.format(name=self.name)).fetchall()
]
- def drop_index(self, index):
+ def drop_index(self, index: str):
cmd = "DROP TABLE {index}"
self.db.execute(cmd.format(index=index))
@@ 551,7 619,7 @@ class Collection:
# BELOW ARE OPERATIONS FOR LOOKUPS
# TypeErrors are caught specifically for python 3 compatibility
-def _eq(field, value, document):
+def _eq(field: str, value: Any, document: Dict[str, Any]) -> bool:
"""
Returns True if the value of a document field is equal to a given value
"""
@@ 561,7 629,7 @@ def _eq(field, value, document):
return False
-def _gt(field, value, document):
+def _gt(field: str, value: Any, document: Dict[str, Any]) -> bool:
"""
Returns True if the value of a document field is greater than a given value
"""
@@ 571,7 639,7 @@ def _gt(field, value, document):
return False
-def _lt(field, value, document):
+def _lt(field: str, value: Any, document: Dict[str, Any]) -> bool:
"""
Returns True if the value of a document field is less than a given value
"""
@@ 581,10 649,10 @@ def _lt(field, value, document):
return False
-def _gte(field, value, document):
+def _gte(field: str, value: Any, document: Dict[str, Any]) -> bool:
"""
- Returns True if the value of a document field is greater than or
- equal to a given value
+ Returns True if the value of a document field is greater than or equal to
+ a given value
"""
try:
return document.get(field, None) >= value
@@ 592,10 660,10 @@ def _gte(field, value, document):
return False
-def _lte(field, value, document):
+def _lte(field: str, value: Any, document: Dict[str, Any]) -> bool:
"""
- Returns True if the value of a document field is less than or
- equal to a given value
+ Returns True if the value of a document field is less than or equal to
+ a given value
"""
try:
return document.get(field, None) <= value
@@ 603,7 671,7 @@ def _lte(field, value, document):
return False
-def _all(field, value, document):
+def _all(field: str, value: List[Any], document: Dict[str, Any]) -> bool:
"""
Returns True if the value of document field contains all the values
specified by ``value``. If supplied value is not an iterable, a
@@ 623,9 691,9 @@ def _all(field, value, document):
return a.intersection(b) == a
-def _in(field, value, document):
+def _in(field: str, value: List[Any], document: Dict[str, Any]) -> bool:
"""
- Returns True if document[field] is in the interable value. If the
+ Returns True if document[field] is in the iterable value. If the
supplied value is not an iterable, then a MalformedQueryException is raised
"""
try:
@@ 636,16 704,16 @@ def _in(field, value, document):
return document.get(field, None) in values
-def _ne(field, value, document):
+def _ne(field: str, value: Any, document: Dict[str, Any]) -> bool:
"""
Returns True if the value of document[field] is not equal to a given value
"""
return document.get(field, None) != value
-def _nin(field, value, document):
+def _nin(field: str, value: List[Any], document: Dict[str, Any]) -> bool:
"""
- Returns True if document[field] is NOT in the interable value. If the
+ Returns True if document[field] is NOT in the iterable value. If the
supplied value is not an iterable, then a MalformedQueryException is raised
"""
try:
@@ 656,7 724,7 @@ def _nin(field, value, document):
return document.get(field, None) not in values
-def _mod(field, value, document):
+def _mod(field: str, value: List[int], document: Dict[str, Any]) -> bool:
"""
Performs a mod on a document field. Value must be a list or tuple with
two values divisor and remainder (i.e. [2, 0]). This will essentially
@@ 671,7 739,9 @@ def _mod(field, value, document):
try:
divisor, remainder = list(map(int, value))
except (TypeError, ValueError):
- raise MalformedQueryException("'$mod' must accept an iterable: [divisor, remainder]")
+ raise MalformedQueryException(
+ "'$mod' must accept an iterable: [divisor, remainder]"
+ )
try:
return int(document.get(field, None)) % divisor == remainder
@@ 679,10 749,10 @@ def _mod(field, value, document):
return False
-def _exists(field, value, document):
+def _exists(field: str, value: bool, document: Dict[str, Any]) -> bool:
"""
- Ensures a document has a given field or not. ``value`` must be either True or
- False, otherwise a MalformedQueryException is raised
+ Ensures a document has a given field or not. ``value`` must be either True
+ or False, otherwise a MalformedQueryException is raised
"""
if value not in (True, False):
raise MalformedQueryException("'$exists' must be supplied a boolean")
@@ 691,3 761,4 @@ def _exists(field, value, document):
return field in document
else:
return field not in document
+
@@ 1,103 1,94 @@
# coding: utf-8
import re
import sqlite3
-
-from mock import Mock, call, patch
+from unittest.mock import Mock, call, patch
from pytest import fixture, mark, raises
-
import nosqlite
@fixture(scope="module")
-def db(request):
- _db = sqlite3.connect(':memory:')
+def db(request) -> sqlite3.Connection:
+ _db = sqlite3.connect(":memory:")
request.addfinalizer(_db.close)
return _db
@fixture(scope="module")
-def collection(db, request):
- return nosqlite.Collection(db, 'foo', create=False)
+def collection(db: sqlite3.Connection, request) -> nosqlite.Collection:
+ return nosqlite.Collection(db, "foo", create=False)
class TestConnection:
-
def test_connect(self):
- conn = nosqlite.Connection(':memory:')
+ conn = nosqlite.Connection(":memory:")
assert conn.db.isolation_level is None
- @patch('nosqlite.sqlite3')
+ @patch("nosqlite.sqlite3")
def test_context_manager_closes_connection(self, sqlite):
with nosqlite.Connection() as conn:
pass
-
assert conn.db.close.called
- @patch('nosqlite.sqlite3')
- @patch('nosqlite.Collection')
+ @patch("nosqlite.sqlite3")
+ @patch("nosqlite.Collection")
def test_getitem_returns_collection(self, mock_collection, sqlite):
sqlite.connect.return_value = sqlite
mock_collection.return_value = mock_collection
conn = nosqlite.Connection()
+ assert "foo" not in conn._collections
+ assert conn["foo"] == mock_collection
- assert 'foo' not in conn._collections
- assert conn['foo'] == mock_collection
-
- @patch('nosqlite.sqlite3')
+ @patch("nosqlite.sqlite3")
def test_getitem_returns_cached_collection(self, sqlite):
conn = nosqlite.Connection()
- conn._collections['foo'] = 'bar'
+ conn._collections["foo"] = "bar"
+ assert conn["foo"] == "bar"
- assert conn['foo'] == 'bar'
-
- @patch('nosqlite.sqlite3')
+ @patch("nosqlite.sqlite3")
def test_drop_collection(self, sqlite):
conn = nosqlite.Connection()
- conn.drop_collection('foo')
+ conn.drop_collection("foo")
+ conn.db.execute.assert_called_with("DROP TABLE IF EXISTS foo")
- conn.db.execute.assert_called_with('drop table if exists foo')
-
- @patch('nosqlite.sqlite3')
+ @patch("nosqlite.sqlite3")
def test_getattr_returns_attribute(self, sqlite):
conn = nosqlite.Connection()
+ assert conn.__getattr__("db") in list(conn.__dict__.values())
- assert conn.__getattr__('db') in list(conn.__dict__.values())
-
- @patch('nosqlite.sqlite3')
+ @patch("nosqlite.sqlite3")
def test_getattr_returns_collection(self, sqlite):
conn = nosqlite.Connection()
- foo = conn.__getattr__('foo')
-
+ foo = conn.__getattr__("foo")
assert foo not in list(conn.__dict__.values())
assert isinstance(foo, nosqlite.Collection)
class TestCollection:
+ def setup_method(self):
+ self.db = sqlite3.connect(":memory:")
+ self.collection = nosqlite.Collection(self.db, "foo", create=False)
- def setup(self):
- self.db = sqlite3.connect(':memory:')
- self.collection = nosqlite.Collection(self.db, 'foo', create=False)
-
- def teardown(self):
+ def teardown_method(self):
self.db.close()
- def unformat_sql(self, sql):
- return re.sub(r'[\s]+', ' ', sql.strip().replace('\n', ''))
+ def unformat_sql(self, sql: str) -> str:
+ return re.sub(r"[\s]+", " ", sql.strip().replace("\n", ""))
def test_create(self):
- collection = nosqlite.Collection(Mock(), 'foo', create=False)
+ collection = nosqlite.Collection(Mock(), "foo", create=False)
collection.create()
- collection.db.execute.assert_any_call("""
- create table if not exists foo (
- id integer primary key autoincrement,
- data text not null
- )
- """)
+ collection.db.execute.assert_any_call(
+ """
+ CREATE TABLE IF NOT EXISTS foo (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ data TEXT NOT NULL
+ )"""
+ )
def test_clear(self):
- collection = nosqlite.Collection(Mock(), 'foo')
+ collection = nosqlite.Collection(Mock(), "foo")
collection.clear()
- collection.db.execute.assert_any_call('delete from foo')
+ collection.db.execute.assert_any_call("DELETE FROM foo")
def test_exists_when_absent(self):
assert not self.collection.exists()
@@ 107,239 98,261 @@ class TestCollection:
assert self.collection.exists()
def test_insert_actually_save(self):
- doc = {'_id': 1, 'foo': 'bar'}
-
+ doc = {"_id": 1, "foo": "bar"}
self.collection.save = Mock()
self.collection.insert(doc)
self.collection.save.assert_called_with(doc)
def test_insert(self):
- doc = {'foo': 'bar'}
-
+ doc = {"foo": "bar"}
self.collection.create()
inserted = self.collection.insert(doc)
- assert inserted['_id'] == 1
+ assert inserted["_id"] == 1
def test_insert_non_dict_raise(self):
doc = "{'foo': 'bar'}"
-
self.collection.create()
with raises(nosqlite.MalformedDocument):
- inserted = self.collection.insert(doc)
+ self.collection.insert(doc)
def test_update_without_upsert(self):
- doc = {'foo': 'bar'}
-
+ doc = {"foo": "bar"}
self.collection.create()
updated = self.collection.update({}, doc)
assert updated is None
def test_update_with_upsert(self):
- doc = {'foo': 'bar'}
-
+ doc = {"foo": "bar"}
self.collection.create()
updated = self.collection.update({}, doc, upsert=True)
assert isinstance(updated, dict)
- assert updated['_id'] == 1
- assert updated['foo'] == doc['foo'] == 'bar'
+ assert updated["_id"] == 1
+ assert updated["foo"] == doc["foo"] == "bar"
def test_save_calls_update(self):
- with patch.object(self.collection, 'update'):
- doc = {'foo': 'bar'}
+ with patch.object(self.collection, "update"):
+ doc = {"foo": "bar"}
self.collection.save(doc)
self.collection.update.assert_called_with(
- {'_id':doc.pop('_id', None)},doc, upsert=True
+ {"_id": doc.pop("_id", None)}, doc, upsert=True
)
def test_save(self):
- doc = {'foo': 'bar'}
-
+ doc = {"foo": "bar"}
self.collection.create()
doc = self.collection.insert(doc)
- doc['foo'] = 'baz'
-
+ doc["foo"] = "baz"
updated = self.collection.save(doc)
- assert updated['foo'] == 'baz'
+ assert updated["foo"] == "baz"
def test_delete_calls_remove(self):
- with patch.object(self.collection, '_remove'):
- doc = {'foo': 'bar'}
+ with patch.object(self.collection, "_remove"):
+ doc = {"foo": "bar"}
self.collection.delete(doc)
self.collection._remove.assert_called_with(doc)
def test_remove_raises_when_no_id(self):
with raises(AssertionError):
- self.collection._remove({'foo': 'bar'})
+ self.collection._remove({"foo": "bar"})
def test_remove(self):
self.collection.create()
- doc = self.collection.insert({'foo': 'bar'})
- assert 1 == int(self.collection.db.execute("select count(1) from foo").fetchone()[0])
-
+ doc = self.collection.insert({"foo": "bar"})
+ assert 1 == int(
+ self.collection.db.execute("SELECT COUNT(1) FROM foo").fetchone()[0]
+ )
self.collection._remove(doc)
- assert 0 == int(self.collection.db.execute("select count(1) from foo").fetchone()[0])
+ assert 0 == int(
+ self.collection.db.execute("SELECT COUNT(1) FROM foo").fetchone()[0]
+ )
def test_delete_one(self):
self.collection.create()
- doc = {'foo':'bar'}
+ doc = {"foo": "bar"}
self.collection.insert(doc)
- assert 1 == int(self.collection.db.execute("select count(1) from foo").fetchone()[0])
-
+ assert 1 == int(
+ self.collection.db.execute("SELECT COUNT(1) FROM foo").fetchone()[0]
+ )
self.collection.delete_one(doc)
- assert 0 == int(self.collection.db.execute("select count(1) from foo").fetchone()[0])
-
+ assert 0 == int(
+ self.collection.db.execute("SELECT COUNT(1) FROM foo").fetchone()[0]
+ )
assert self.collection.delete_one(doc) is None
def test_insert_bulk_documents_on_a_transaction(self):
self.collection.create()
self.collection.begin()
- self.collection.save({'a':1, 'b':'c'})
- self.collection.save({'a':1, 'b':'a'})
+ self.collection.save({"a": 1, "b": "c"})
+ self.collection.save({"a": 1, "b": "a"})
self.collection.rollback()
- assert 0 == self.collection.count({'a':1})
+ assert 0 == self.collection.count({"a": 1})
self.collection.begin()
- self.collection.save({'a':1, 'b':'c'})
- self.collection.save({'a':1, 'b':'a'})
+ self.collection.save({"a": 1, "b": "c"})
+ self.collection.save({"a": 1, "b": "a"})
self.collection.commit()
- assert 2 == self.collection.count({'a':1})
+ assert 2 == self.collection.count({"a": 1})
def test_ensure_index(self):
self.collection.create()
- doc = {'foo':'bar'}
+ doc = {"foo": "bar"}
self.collection.insert(doc)
- self.collection.ensure_index('foo')
- cmd = ("SELECT name FROM sqlite_master "
- "WHERE type='table' and name like '{name}{{%}}'")
- index_name = '%s{%s}' % (self.collection.name, 'foo')
- assert index_name == self.collection.db.execute(
- cmd.format(name=self.collection.name)
- ).fetchone()[0]
+ self.collection.ensure_index("foo")
+ cmd = (
+ "SELECT name FROM sqlite_master "
+ "WHERE type='table' AND name LIKE '{name}{{%}}'"
+ )
+ index_name = f"{self.collection.name}{{foo}}"
+ assert (
+ index_name
+ == self.collection.db.execute(
+ cmd.format(name=self.collection.name)
+ ).fetchone()[0]
+ )
def test_create_index(self):
self.collection.create()
- doc = {'foo':'bar'}
+ doc = {"foo": "bar"}
self.collection.insert(doc)
- self.collection.create_index('foo', reindex=False)
- cmd = ("SELECT name FROM sqlite_master "
- "WHERE type='table' and name like '{name}{{%}}'")
- index_name = '%s{%s}' % (self.collection.name, 'foo')
- assert index_name == self.collection.db.execute(
- cmd.format(name=self.collection.name)
- ).fetchone()[0]
+ self.collection.create_index("foo", reindex=False)
+ cmd = (
+ "SELECT name FROM sqlite_master "
+ "WHERE type='table' AND name LIKE '{name}{{%}}'"
+ )
+ index_name = f"{self.collection.name}{{foo}}"
+ assert (
+ index_name
+ == self.collection.db.execute(
+ cmd.format(name=self.collection.name)
+ ).fetchone()[0]
+ )
def test_create_index_on_nested_keys(self):
self.collection.create()
- doc = {'foo':{'bar':'zzz'},'bok':'bak'}
+ doc = {"foo": {"bar": "zzz"}, "bok": "bak"}
self.collection.insert(doc)
- self.collection.insert({'a':1,'b':2})
- self.collection.create_index('foo.bar', reindex=True)
- index = '[%s{%s}]' % (self.collection.name, 'foo_bar')
+ self.collection.insert({"a": 1, "b": 2})
+ self.collection.create_index("foo.bar", reindex=True)
+ index = f"[{self.collection.name}{{foo_bar}}]"
assert index in self.collection.list_indexes()
- self.collection.create_index(['foo_bar','bok'], reindex=True)
- index = '[%s{%s}]' % (self.collection.name, 'foo_bar,bok')
+ self.collection.create_index(["foo_bar", "bok"], reindex=True)
+ index = f"[{self.collection.name}{{foo_bar,bok}}]"
assert index in self.collection.list_indexes()
def test_index_on_nested_keys(self):
self.test_create_index_on_nested_keys()
- index_name = '%s{%s}' % (self.collection.name, 'foo_bar')
- cmd = ("SELECT id, foo_bar FROM [%s]" % index_name)
+ index_name = f"{self.collection.name}{{foo_bar}}"
+ cmd = f"SELECT id, foo_bar FROM [{index_name}]"
assert (1, '"zzz"') == self.collection.db.execute(cmd).fetchone()
-
- index_name = '%s{%s}' % (self.collection.name, 'foo_bar,bok')
- cmd = ("SELECT * FROM [%s]" % index_name)
- assert (1, '"zzz"', '"bak"') == self.collection.db.execute(cmd).fetchone()
+ index_name = f"{self.collection.name}{{foo_bar,bok}}"
+ cmd = f"SELECT * FROM [{index_name}]"
+ assert (1, '"zzz"', '"bak"') == self.collection.db.execute(
+ cmd
+ ).fetchone()
def test_reindex(self):
self.test_create_index()
- index_name = '%s{%s}' % (self.collection.name, 'foo')
- self.collection.reindex('[%s]' % index_name)
- cmd = ("SELECT id, foo FROM [%s]" % index_name)
+ index_name = f"{self.collection.name}{{foo}}"
+ self.collection.reindex(f"[{index_name}]")
+ cmd = f"SELECT id, foo FROM [{index_name}]"
assert (1, '"bar"') == self.collection.db.execute(cmd).fetchone()
-
+
def test_insert_auto_index(self):
self.test_reindex()
- self.collection.insert({'foo':'baz'})
- index_name = '%s{%s}' % (self.collection.name, 'foo')
- cmd = ("SELECT id, foo FROM [%s]" % index_name)
- assert (1, '"bar"') in self.collection.db.execute(cmd).fetchall()
- assert (2, '"baz"') in self.collection.db.execute(cmd).fetchall()
+ self.collection.insert({"foo": "baz"})
+ index_name = f"{self.collection.name}{{foo}}"
+ cmd = f"SELECT id, foo FROM [{index_name}]"
+ results = self.collection.db.execute(cmd).fetchall()
+ assert (1, '"bar"') in results
+ assert (2, '"baz"') in results
def test_create_compound_index(self):
self.collection.create()
- doc = {'foo':'bar', 'far':'boo'}
+ doc = {"foo": "bar", "far": "boo"}
self.collection.insert(doc)
- self.collection.create_index(('foo','far'))
- cmd = ("SELECT name FROM sqlite_master "
- "WHERE type='table' and name like '{name}{{%}}'")
- assert '%s{%s}' % (self.collection.name, 'foo,far') == self.collection.db.execute(
- cmd.format(name=self.collection.name)
- ).fetchone()[0]
+ self.collection.create_index(("foo", "far"))
+ cmd = (
+ "SELECT name FROM sqlite_master "
+ "WHERE type='table' AND name LIKE '{name}{{%}}'"
+ )
+ assert (
+ f"{self.collection.name}{{foo,far}}"
+ == self.collection.db.execute(
+ cmd.format(name=self.collection.name)
+ ).fetchone()[0]
+ )
def test_create_unique_index(self):
self.collection.create()
- doc = {'foo':'bar'}
+ doc = {"foo": "bar"}
self.collection.insert(doc)
- self.collection.create_index('foo', reindex=False, unique=True)
- cmd = ("SELECT name FROM sqlite_master "
- "WHERE type='table' and name like '{name}{{%}}'")
- index_name = '%s{%s}' % (self.collection.name, 'foo')
- assert index_name == self.collection.db.execute(
- cmd.format(name=self.collection.name)
- ).fetchone()[0]
+ self.collection.create_index("foo", reindex=False, unique=True)
+ cmd = (
+ "SELECT name FROM sqlite_master "
+ "WHERE type='table' AND name LIKE '{name}{{%}}'"
+ )
+ index_name = f"{self.collection.name}{{foo}}"
+ assert (
+ index_name
+ == self.collection.db.execute(
+ cmd.format(name=self.collection.name)
+ ).fetchone()[0]
+ )
def test_reindex_unique_index(self):
self.test_create_unique_index()
- index_name = '%s{%s}' % (self.collection.name, 'foo')
- self.collection.reindex('[%s]' % index_name)
- cmd = ("SELECT id, foo FROM [%s]" % index_name)
+ index_name = f"{self.collection.name}{{foo}}"
+ self.collection.reindex(f"[{index_name}]")
+ cmd = f"SELECT id, foo FROM [{index_name}]"
assert (1, '"bar"') == self.collection.db.execute(cmd).fetchone()
def test_uniqueness(self):
self.test_reindex_unique_index()
- doc = {'foo':'bar'}
- index_name = '%s{%s}' % (self.collection.name, 'foo')
- cmd = ("SELECT id, foo FROM [%s]" % index_name)
- assert [(1, '"bar"')] == self.collection.db.execute(cmd).fetchall()
+ doc = {"foo": "bar"}
+ index_name = f"{self.collection.name}{{foo}}"
+ cmd = f"SELECT id, foo FROM [{index_name}]"
+ results = self.collection.db.execute(cmd).fetchall()
+ assert [(1, '"bar"')] == results
with raises(sqlite3.IntegrityError):
self.collection.insert(doc)
- assert [(1, '"bar"')] == self.collection.db.execute(cmd).fetchall()
+ assert [(1, '"bar"')] == results
def test_update_to_break_uniqueness(self):
self.test_uniqueness()
- doc = {'foo':'baz'}
+ doc = {"foo": "baz"}
self.collection.insert(doc)
- index_name = '%s{%s}' % (self.collection.name, 'foo')
- cmd = ("SELECT id, foo FROM [%s]" % index_name)
- assert [(1, '"bar"'),(3, '"baz"')] == self.collection.db.execute(cmd).fetchall()
- doc = {'foo':'bar', '_id':3}
+ index_name = f"{self.collection.name}{{foo}}"
+ cmd = f"SELECT id, foo FROM [{index_name}]"
+ results = self.collection.db.execute(cmd).fetchall()
+ assert [(1, '"bar"'), (3, '"baz"')] == results
+ doc = {"foo": "bar", "_id": 3}
with raises(sqlite3.IntegrityError):
self.collection.save(doc)
- assert [(1, '"bar"'),(3, '"baz"')] == self.collection.db.execute(cmd).fetchall()
+ assert [(1, '"bar"'), (3, '"baz"')] == results
def test_create_unique_index_on_non_unique_collection(self):
self.collection.create()
- self.collection.insert({'foo':'bar','a':1})
- self.collection.insert({'foo':'bar','a':2})
- assert 2 == self.collection.count({'foo':'bar'})
+ self.collection.insert({"foo": "bar", "a": 1})
+ self.collection.insert({"foo": "bar", "a": 2})
+ assert 2 == self.collection.count({"foo": "bar"})
with raises(sqlite3.IntegrityError):
- self.collection.create_index('foo', unique=True)
+ self.collection.create_index("foo", unique=True)
assert 0 == len(self.collection.list_indexes())
def test_hint_index(self):
self.collection.create()
- self.collection.insert({'foo':'bar','a':1})
- self.collection.insert({'foo':'bar','a':2})
- self.collection.insert({'fox':'baz','a':3})
- self.collection.insert({'fox':'bar','a':4})
- self.collection.create_index('foo')
+ self.collection.insert({"foo": "bar", "a": 1})
+ self.collection.insert({"foo": "bar", "a": 2})
+ self.collection.insert({"fox": "baz", "a": 3})
+ self.collection.insert({"fox": "bar", "a": 4})
+ self.collection.create_index("foo")
self.collection.db = Mock(wraps=self.db)
- docs_without_hint = self.collection.find({'foo':'bar', 'a':2})
- self.collection.db.execute.assert_any_call(
- 'select id, data from foo '
+ docs_without_hint = self.collection.find({"foo": "bar", "a": 2})
+ self.collection.db.execute.assert_any_call("SELECT id, data FROM foo ")
+ docs_with_hint = self.collection.find(
+ {"foo": "bar", "a": 2}, hint="[foo{foo}]"
)
- docs_with_hint = self.collection.find({'foo':'bar', 'a':2}, hint='[foo{foo}]')
self.collection.db.execute.assert_any_call(
- 'select id, data from foo where id in (select id from [foo{foo}] where foo=\'"bar"\')'
+ "SELECT id, data FROM foo WHERE id IN (SELECT id FROM [foo{foo}] WHERE foo='\"bar\"')"
)
assert docs_without_hint == docs_with_hint
@@ 347,126 360,152 @@ class TestCollection:
self.test_create_index()
assert isinstance(self.collection.list_indexes(), list)
assert isinstance(self.collection.list_indexes()[0], str)
- assert '[%s{%s}]' % (self.collection.name, 'foo') == self.collection.list_indexes()[0]
+ assert (
+ f"[{self.collection.name}{{foo}}]"
+ == self.collection.list_indexes()[0]
+ )
def test_list_indexes_as_keys(self):
self.test_create_index()
assert isinstance(self.collection.list_indexes(as_keys=True), list)
assert isinstance(self.collection.list_indexes(as_keys=True)[0], list)
- assert ['foo'] == self.collection.list_indexes(as_keys=True)[0]
+ assert ["foo"] == self.collection.list_indexes(as_keys=True)[0]
def test_drop_index(self):
self.test_create_index()
- index_name = '[%s{%s}]' % (self.collection.name, 'foo')
+ index_name = f"[{self.collection.name}{{foo}}]"
self.collection.drop_index(index_name)
- cmd = ("SELECT name FROM sqlite_master "
- "WHERE type='table' and name like '{name}{{%}}'")
- assert self.collection.db.execute(cmd.format(name=self.collection.name)).fetchone() is None
+ cmd = (
+ "SELECT name FROM sqlite_master "
+ "WHERE type='table' AND name LIKE '{name}{{%}}'"
+ )
+ assert (
+ self.collection.db.execute(
+ cmd.format(name=self.collection.name)
+ ).fetchone()
+ is None
+ )
def test_drop_indexes(self):
self.test_create_index()
self.collection.drop_indexes()
- cmd = ("SELECT name FROM sqlite_master "
- "WHERE type='table' and name like '{name}{{%}}'")
- assert self.collection.db.execute(cmd.format(name=self.collection.name)).fetchone() is None
+ cmd = (
+ "SELECT name FROM sqlite_master "
+ "WHERE type='table' AND name LIKE '{name}{{%}}'"
+ )
+ assert (
+ self.collection.db.execute(
+ cmd.format(name=self.collection.name)
+ ).fetchone()
+ is None
+ )
def test_find_with_sort(self):
self.collection.create()
- self.collection.save({'a':1, 'b':'c'})
- self.collection.save({'a':1, 'b':'a'})
- self.collection.save({'a':5, 'b':'x'})
- self.collection.save({'a':3, 'b':'x'})
- self.collection.save({'a':4, 'b':'z'})
+ self.collection.save({"a": 1, "b": "c"})
+ self.collection.save({"a": 1, "b": "a"})
+ self.collection.save({"a": 5, "b": "x"})
+ self.collection.save({"a": 3, "b": "x"})
+ self.collection.save({"a": 4, "b": "z"})
assert [
- {'a':1, 'b':'c', '_id':1},
- {'a':1, 'b':'a', '_id':2},
- {'a':5, 'b':'x', '_id':3},
- {'a':3, 'b':'x', '_id':4},
- {'a':4, 'b':'z', '_id':5},
+ {"a": 1, "b": "c", "_id": 1},
+ {"a": 1, "b": "a", "_id": 2},
+ {"a": 5, "b": "x", "_id": 3},
+ {"a": 3, "b": "x", "_id": 4},
+ {"a": 4, "b": "z", "_id": 5},
] == self.collection.find()
assert [
- {'a':1, 'b':'c', '_id':1},
- {'a':1, 'b':'a', '_id':2},
- {'a':3, 'b':'x', '_id':4},
- {'a':4, 'b':'z', '_id':5},
- {'a':5, 'b':'x', '_id':3},
- ] == self.collection.find(sort={'a':nosqlite.ASCENDING})
+ {"a": 1, "b": "c", "_id": 1},
+ {"a": 1, "b": "a", "_id": 2},
+ {"a": 3, "b": "x", "_id": 4},
+ {"a": 4, "b": "z", "_id": 5},
+ {"a": 5, "b": "x", "_id": 3},
+ ] == self.collection.find(sort={"a": nosqlite.ASCENDING})
assert [
- {'a':1, 'b':'a', '_id':2},
- {'a':1, 'b':'c', '_id':1},
- {'a':5, 'b':'x', '_id':3},
- {'a':3, 'b':'x', '_id':4},
- {'a':4, 'b':'z', '_id':5},
- ] == self.collection.find(sort={'b':nosqlite.ASCENDING})
+ {"a": 1, "b": "a", "_id": 2},
+ {"a": 1, "b": "c", "_id": 1},
+ {"a": 5, "b": "x", "_id": 3},
+ {"a": 3, "b": "x", "_id": 4},
+ {"a": 4, "b": "z", "_id": 5},
+ ] == self.collection.find(sort={"b": nosqlite.ASCENDING})
assert [
- {'a':5, 'b':'x', '_id':3},
- {'a':4, 'b':'z', '_id':5},
- {'a':3, 'b':'x', '_id':4},
- {'a':1, 'b':'c', '_id':1},
- {'a':1, 'b':'a', '_id':2},
- ] == self.collection.find(sort={'a':nosqlite.DESCENDING})
+ {"a": 5, "b": "x", "_id": 3},
+ {"a": 4, "b": "z", "_id": 5},
+ {"a": 3, "b": "x", "_id": 4},
+ {"a": 1, "b": "c", "_id": 1},
+ {"a": 1, "b": "a", "_id": 2},
+ ] == self.collection.find(sort={"a": nosqlite.DESCENDING})
assert [
- {'a':4, 'b':'z', '_id':5},
- {'a':5, 'b':'x', '_id':3},
- {'a':3, 'b':'x', '_id':4},
- {'a':1, 'b':'c', '_id':1},
- {'a':1, 'b':'a', '_id':2},
- ] == self.collection.find(sort={'b':nosqlite.DESCENDING})
+ {"a": 4, "b": "z", "_id": 5},
+ {"a": 5, "b": "x", "_id": 3},
+ {"a": 3, "b": "x", "_id": 4},
+ {"a": 1, "b": "c", "_id": 1},
+ {"a": 1, "b": "a", "_id": 2},
+ ] == self.collection.find(sort={"b": nosqlite.DESCENDING})
assert [
- {'a':1, 'b':'a', '_id':2},
- {'a':1, 'b':'c', '_id':1},
- {'a':3, 'b':'x', '_id':4},
- {'a':4, 'b':'z', '_id':5},
- {'a':5, 'b':'x', '_id':3},
- ] == self.collection.find(sort={'a':nosqlite.ASCENDING, 'b':nosqlite.ASCENDING})
+ {"a": 1, "b": "a", "_id": 2},
+ {"a": 1, "b": "c", "_id": 1},
+ {"a": 3, "b": "x", "_id": 4},
+ {"a": 4, "b": "z", "_id": 5},
+ {"a": 5, "b": "x", "_id": 3},
+ ] == self.collection.find(
+ sort={"a": nosqlite.ASCENDING, "b": nosqlite.ASCENDING}
+ )
assert [
- {'a':5, 'b':'x', '_id':3},
- {'a':4, 'b':'z', '_id':5},
- {'a':3, 'b':'x', '_id':4},
- {'a':1, 'b':'a', '_id':2},
- {'a':1, 'b':'c', '_id':1},
- ] == self.collection.find(sort={'a':nosqlite.DESCENDING, 'b':nosqlite.ASCENDING})
+ {"a": 5, "b": "x", "_id": 3},
+ {"a": 4, "b": "z", "_id": 5},
+ {"a": 3, "b": "x", "_id": 4},
+ {"a": 1, "b": "a", "_id": 2},
+ {"a": 1, "b": "c", "_id": 1},
+ ] == self.collection.find(
+ sort={"a": nosqlite.DESCENDING, "b": nosqlite.ASCENDING}
+ )
assert [
- {'a':5, 'b':'x', '_id':3},
- {'a':4, 'b':'z', '_id':5},
- {'a':3, 'b':'x', '_id':4},
- {'a':1, 'b':'c', '_id':1},
- {'a':1, 'b':'a', '_id':2},
- ] == self.collection.find(sort={'a':nosqlite.DESCENDING, 'b':nosqlite.DESCENDING})
+ {"a": 5, "b": "x", "_id": 3},
+ {"a": 4, "b": "z", "_id": 5},
+ {"a": 3, "b": "x", "_id": 4},
+ {"a": 1, "b": "c", "_id": 1},
+ {"a": 1, "b": "a", "_id": 2},
+ ] == self.collection.find(
+ sort={"a": nosqlite.DESCENDING, "b": nosqlite.DESCENDING}
+ )
def test_find_with_sort_on_nested_key(self):
self.collection.create()
- self.collection.save({'a':{'b':5}, 'c':'B'})
- self.collection.save({'a':{'b':9}, 'c':'A'})
- self.collection.save({'a':{'b':7}, 'c':'C'})
+ self.collection.save({"a": {"b": 5}, "c": "B"})
+ self.collection.save({"a": {"b": 9}, "c": "A"})
+ self.collection.save({"a": {"b": 7}, "c": "C"})
assert [
- {'a':{'b':5}, 'c':'B', '_id':1},
- {'a':{'b':7}, 'c':'C', '_id':3},
- {'a':{'b':9}, 'c':'A', '_id':2},
- ] == self.collection.find(sort={'a.b':nosqlite.ASCENDING})
+ {"a": {"b": 5}, "c": "B", "_id": 1},
+ {"a": {"b": 7}, "c": "C", "_id": 3},
+ {"a": {"b": 9}, "c": "A", "_id": 2},
+ ] == self.collection.find(sort={"a.b": nosqlite.ASCENDING})
assert [
- {'a':{'b':9}, 'c':'A', '_id':2},
- {'a':{'b':7}, 'c':'C', '_id':3},
- {'a':{'b':5}, 'c':'B', '_id':1},
- ] == self.collection.find(sort={'a.b':nosqlite.DESCENDING})
+ {"a": {"b": 9}, "c": "A", "_id": 2},
+ {"a": {"b": 7}, "c": "C", "_id": 3},
+ {"a": {"b": 5}, "c": "B", "_id": 1},
+ ] == self.collection.find(sort={"a.b": nosqlite.DESCENDING})
- @mark.parametrize('strdoc,doc', [
- ('{"foo": "bar"}', {'_id': 1, 'foo': 'bar'}),
- ('{"foo": "☃"}', {'_id': 1, 'foo': '☃'}),
- ])
+ @mark.parametrize(
+ "strdoc,doc",
+ [
+ ('{"foo": "bar"}', {"_id": 1, "foo": "bar"}),
+ ('{"foo": "☃"}', {"_id": 1, "foo": "☃"}),
+ ],
+ )
def test_load(self, strdoc, doc):
assert doc == self.collection._load(1, strdoc)
def test_find(self):
- query = {'foo': 'bar'}
+ query = {"foo": "bar"}
documents = [
- (1, {'foo': 'bar', 'baz': 'qux'}), # Will match
- (2, {'foo': 'bar', 'bar': 'baz'}), # Will match
- (2, {'foo': 'baz', 'bar': 'baz'}), # Will not match
- (3, {'baz': 'qux'}), # Will not match
+ (1, {"foo": "bar", "baz": "qux"}), # Will match
+ (2, {"foo": "bar", "bar": "baz"}), # Will match
+ (2, {"foo": "baz", "bar": "baz"}), # Will not match
+ (3, {"baz": "qux"}), # Will not match
]
- collection = nosqlite.Collection(Mock(), 'foo', create=False)
+ collection = nosqlite.Collection(Mock(), "foo", create=False)
collection.db.execute.return_value = collection.db
collection.db.fetchall.return_value = documents
collection._load = lambda id, data: data
@@ 475,15 514,15 @@ class TestCollection:
assert len(ret) == 2
def test_find_honors_limit(self):
- query = {'foo': 'bar'}
+ query = {"foo": "bar"}
documents = [
- (1, {'foo': 'bar', 'baz': 'qux'}), # Will match
- (2, {'foo': 'bar', 'bar': 'baz'}), # Will match
- (2, {'foo': 'baz', 'bar': 'baz'}), # Will not match
- (3, {'baz': 'qux'}), # Will not match
+ (1, {"foo": "bar", "baz": "qux"}), # Will match
+ (2, {"foo": "bar", "bar": "baz"}), # Will match
+ (2, {"foo": "baz", "bar": "baz"}), # Will not match
+ (3, {"baz": "qux"}), # Will not match
]
- collection = nosqlite.Collection(Mock(), 'foo', create=False)
+ collection = nosqlite.Collection(Mock(), "foo", create=False)
collection.db.execute.return_value = collection.db
collection.db.fetchall.return_value = documents
collection._load = lambda id, data: data
@@ 492,225 531,209 @@ class TestCollection:
assert len(ret) == 1
def test_apply_query_and_type(self):
- query = {'$and': [{'foo': 'bar'}, {'baz': 'qux'}]}
-
- assert self.collection._apply_query(query, {'foo': 'bar', 'baz': 'qux'})
- assert not self.collection._apply_query(query, {'foo': 'bar', 'baz': 'foo'})
+ query = {"$and": [{"foo": "bar"}, {"baz": "qux"}]}
+ assert self.collection._apply_query(query, {"foo": "bar", "baz": "qux"})
+ assert not self.collection._apply_query(
+ query, {"foo": "bar", "baz": "foo"}
+ )
def test_apply_query_or_type(self):
- query = {'$or': [{'foo': 'bar'}, {'baz': 'qux'}]}
-
- assert self.collection._apply_query(query, {'foo': 'bar', 'abc': 'xyz'})
- assert self.collection._apply_query(query, {'baz': 'qux', 'abc': 'xyz'})
- assert not self.collection._apply_query(query, {'abc': 'xyz'})
+ query = {"$or": [{"foo": "bar"}, {"baz": "qux"}]}
+ assert self.collection._apply_query(query, {"foo": "bar", "abc": "xyz"})
+ assert self.collection._apply_query(query, {"baz": "qux", "abc": "xyz"})
+ assert not self.collection._apply_query(query, {"abc": "xyz"})
def test_apply_query_not_type(self):
- query = {'$not': {'foo': 'bar'}}
-
- assert self.collection._apply_query(query, {'foo': 'baz'})
- assert not self.collection._apply_query(query, {'foo': 'bar'})
+ query = {"$not": {"foo": "bar"}}
+ assert self.collection._apply_query(query, {"foo": "baz"})
+ assert not self.collection._apply_query(query, {"foo": "bar"})
def test_apply_query_nor_type(self):
- query = {'$nor': [{'foo': 'bar'}, {'baz': 'qux'}]}
-
- assert self.collection._apply_query(query, {'foo': 'baz', 'baz': 'bar'})
- assert not self.collection._apply_query(query, {'foo': 'bar'})
- assert not self.collection._apply_query(query, {'baz': 'qux'})
- assert not self.collection._apply_query(query, {'foo': 'bar', 'baz': 'qux'})
+ query = {"$nor": [{"foo": "bar"}, {"baz": "qux"}]}
+ assert self.collection._apply_query(query, {"foo": "baz", "baz": "bar"})
+ assert not self.collection._apply_query(query, {"foo": "bar"})
+ assert not self.collection._apply_query(query, {"baz": "qux"})
+ assert not self.collection._apply_query(
+ query, {"foo": "bar", "baz": "qux"}
+ )
def test_apply_query_gt_operator(self):
- query = {'foo': {'$gt': 5}}
-
- assert self.collection._apply_query(query, {'foo': 10})
- assert not self.collection._apply_query(query, {'foo': 4})
+ query = {"foo": {"$gt": 5}}
+ assert self.collection._apply_query(query, {"foo": 10})
+ assert not self.collection._apply_query(query, {"foo": 4})
def test_apply_query_gte_operator(self):
- query = {'foo': {'$gte': 5}}
-
- assert self.collection._apply_query(query, {'foo': 5})
- assert not self.collection._apply_query(query, {'foo': 4})
+ query = {"foo": {"$gte": 5}}
+ assert self.collection._apply_query(query, {"foo": 5})
+ assert not self.collection._apply_query(query, {"foo": 4})
def test_apply_query_lt_operator(self):
- query = {'foo': {'$lt': 5}}
-
- assert self.collection._apply_query(query, {'foo': 4})
- assert not self.collection._apply_query(query, {'foo': 10})
+ query = {"foo": {"$lt": 5}}
+ assert self.collection._apply_query(query, {"foo": 4})
+ assert not self.collection._apply_query(query, {"foo": 10})
def test_apply_query_lte_operator(self):
- query = {'foo': {'$lte': 5}}
-
- assert self.collection._apply_query(query, {'foo': 5})
- assert not self.collection._apply_query(query, {'foo': 10})
+ query = {"foo": {"$lte": 5}}
+ assert self.collection._apply_query(query, {"foo": 5})
+ assert not self.collection._apply_query(query, {"foo": 10})
def test_apply_query_eq_operator(self):
- query = {'foo': {'$eq': 5}}
-
- assert self.collection._apply_query(query, {'foo': 5})
- assert not self.collection._apply_query(query, {'foo': 4})
- assert not self.collection._apply_query(query, {'foo': 'bar'})
+ query = {"foo": {"$eq": 5}}
+ assert self.collection._apply_query(query, {"foo": 5})
+ assert not self.collection._apply_query(query, {"foo": 4})
+ assert not self.collection._apply_query(query, {"foo": "bar"})
def test_apply_query_in_operator(self):
- query = {'foo': {'$in': [1, 2, 3]}}
-
- assert self.collection._apply_query(query, {'foo': 1})
- assert not self.collection._apply_query(query, {'foo': 4})
- assert not self.collection._apply_query(query, {'foo': 'bar'})
+ query = {"foo": {"$in": [1, 2, 3]}}
+ assert self.collection._apply_query(query, {"foo": 1})
+ assert not self.collection._apply_query(query, {"foo": 4})
+ assert not self.collection._apply_query(query, {"foo": "bar"})
def test_apply_query_in_operator_raises(self):
- query = {'foo': {'$in': 5}}
-
+ query = {"foo": {"$in": 5}}
with raises(nosqlite.MalformedQueryException):
- self.collection._apply_query(query, {'foo': 1})
+ self.collection._apply_query(query, {"foo": 1})
def test_apply_query_nin_operator(self):
- query = {'foo': {'$nin': [1, 2, 3]}}
-
- assert self.collection._apply_query(query, {'foo': 4})
- assert self.collection._apply_query(query, {'foo': 'bar'})
- assert not self.collection._apply_query(query, {'foo': 1})
+ query = {"foo": {"$nin": [1, 2, 3]}}
+ assert self.collection._apply_query(query, {"foo": 4})
+ assert self.collection._apply_query(query, {"foo": "bar"})
+ assert not self.collection._apply_query(query, {"foo": 1})
def test_apply_query_nin_operator_raises(self):
- query = {'foo': {'$nin': 5}}
-
+ query = {"foo": {"$nin": 5}}
with raises(nosqlite.MalformedQueryException):
- self.collection._apply_query(query, {'foo': 1})
+ self.collection._apply_query(query, {"foo": 1})
def test_apply_query_ne_operator(self):
- query = {'foo': {'$ne': 5}}
-
- assert self.collection._apply_query(query, {'foo': 1})
- assert self.collection._apply_query(query, {'foo': 'bar'})
- assert not self.collection._apply_query(query, {'foo': 5})
+ query = {"foo": {"$ne": 5}}
+ assert self.collection._apply_query(query, {"foo": 1})
+ assert self.collection._apply_query(query, {"foo": "bar"})
+ assert not self.collection._apply_query(query, {"foo": 5})
def test_apply_query_all_operator(self):
- query = {'foo': {'$all': [1, 2, 3]}}
-
- assert self.collection._apply_query(query, {'foo': list(range(10))})
- assert not self.collection._apply_query(query, {'foo': ['bar', 'baz']})
- assert not self.collection._apply_query(query, {'foo': 3})
+ query = {"foo": {"$all": [1, 2, 3]}}
+ assert self.collection._apply_query(query, {"foo": list(range(10))})
+ assert not self.collection._apply_query(query, {"foo": ["bar", "baz"]})
+ assert not self.collection._apply_query(query, {"foo": 3})
def test_apply_query_all_operator_raises(self):
- query = {'foo': {'$all': 3}}
-
+ query = {"foo": {"$all": 3}}
with raises(nosqlite.MalformedQueryException):
- self.collection._apply_query(query, {'foo': 'bar'})
+ self.collection._apply_query(query, {"foo": "bar"})
def test_apply_query_mod_operator(self):
- query = {'foo': {'$mod': [2, 0]}}
-
- assert self.collection._apply_query(query, {'foo': 4})
- assert not self.collection._apply_query(query, {'foo': 3})
- assert not self.collection._apply_query(query, {'foo': 'bar'})
+ query = {"foo": {"$mod": [2, 0]}}
+ assert self.collection._apply_query(query, {"foo": 4})
+ assert not self.collection._apply_query(query, {"foo": 3})
+ assert not self.collection._apply_query(query, {"foo": "bar"})
def test_apply_query_mod_operator_raises(self):
- query = {'foo': {'$mod': 2}}
-
+ query = {"foo": {"$mod": 2}}
with raises(nosqlite.MalformedQueryException):
- self.collection._apply_query(query, {'foo': 5})
+ self.collection._apply_query(query, {"foo": 5})
def test_apply_query_honors_multiple_operators(self):
- query = {'foo': {'$gte': 0, '$lte': 10, '$mod': [2, 0]}}
-
- assert self.collection._apply_query(query, {'foo': 4})
- assert not self.collection._apply_query(query, {'foo': 3})
- assert not self.collection._apply_query(query, {'foo': 15})
- assert not self.collection._apply_query(query, {'foo': 'foo'})
+ query = {"foo": {"$gte": 0, "$lte": 10, "$mod": [2, 0]}}
+ assert self.collection._apply_query(query, {"foo": 4})
+ assert not self.collection._apply_query(query, {"foo": 3})
+ assert not self.collection._apply_query(query, {"foo": 15})
+ assert not self.collection._apply_query(query, {"foo": "foo"})
def test_apply_query_honors_logical_and_operators(self):
- # 'bar' must be 'baz', and 'foo' must be an even number 0-10 or an odd number > 10
+ # 'bar' must be 'baz', and 'foo' must be an even number 0-10
+ # or an odd number > 10
query = {
- 'bar': 'baz',
- '$or': [
- {'foo': {'$gte': 0, '$lte': 10, '$mod': [2, 0]}},
- {'foo': {'$gt': 10, '$mod': [2, 1]}},
- ]
+ "bar": "baz",
+ "$or": [
+ {"foo": {"$gte": 0, "$lte": 10, "$mod": [2, 0]}},
+ {"foo": {"$gt": 10, "$mod": [2, 1]}},
+ ],
}
- assert self.collection._apply_query(query, {'bar': 'baz', 'foo': 4})
- assert self.collection._apply_query(query, {'bar': 'baz', 'foo': 15})
- assert not self.collection._apply_query(query, {'bar': 'baz', 'foo': 14})
- assert not self.collection._apply_query(query, {'bar': 'qux', 'foo': 4})
+ assert self.collection._apply_query(query, {"bar": "baz", "foo": 4})
+ assert self.collection._apply_query(query, {"bar": "baz", "foo": 15})
+ assert not self.collection._apply_query(
+ query, {"bar": "baz", "foo": 14}
+ )
+ assert not self.collection._apply_query(query, {"bar": "qux", "foo": 4})
def test_apply_query_exists(self):
- query_exists = {'foo': {'$exists': True}}
- query_not_exists = {'foo': {'$exists': False}}
-
- assert self.collection._apply_query(query_exists, {'foo': 'bar'})
- assert self.collection._apply_query(query_not_exists, {'bar': 'baz'})
- assert not self.collection._apply_query(query_exists, {'baz': 'bar'})
- assert not self.collection._apply_query(query_not_exists, {'foo': 'bar'})
+ query_exists = {"foo": {"$exists": True}}
+ query_not_exists = {"foo": {"$exists": False}}
+ assert self.collection._apply_query(query_exists, {"foo": "bar"})
+ assert self.collection._apply_query(query_not_exists, {"bar": "baz"})
+ assert not self.collection._apply_query(query_exists, {"baz": "bar"})
+ assert not self.collection._apply_query(
+ query_not_exists, {"foo": "bar"}
+ )
def test_apply_query_exists_raises(self):
- query = {'foo': {'$exists': 'foo'}}
-
+ query = {"foo": {"$exists": "foo"}}
with raises(nosqlite.MalformedQueryException):
- self.collection._apply_query(query, {'foo': 'bar'})
+ self.collection._apply_query(query, {"foo": "bar"})
def test_get_operator_fn_improper_op(self):
with raises(nosqlite.MalformedQueryException):
- self.collection._get_operator_fn('foo')
+ self.collection._get_operator_fn("foo")
def test_get_operator_fn_valid_op(self):
- assert self.collection._get_operator_fn('$in') == nosqlite._in
+ assert self.collection._get_operator_fn("$in") == nosqlite._in
def test_get_operator_fn_no_op(self):
with raises(nosqlite.MalformedQueryException):
- self.collection._get_operator_fn('$foo')
+ self.collection._get_operator_fn("$foo")
def test_find_and_modify(self):
- update = {'foo': 'bar'}
+ update = {"foo": "bar"}
docs = [
- {'foo': 'foo'},
- {'baz': 'qux'},
+ {"foo": "foo"},
+ {"baz": "qux"},
]
- with patch.object(self.collection, 'find'):
- with patch.object(self.collection, 'save'):
+ with patch.object(self.collection, "find"):
+ with patch.object(self.collection, "save"):
self.collection.find.return_value = docs
self.collection.find_and_modify(update=update)
- self.collection.save.assert_has_calls([
- call({'foo': 'bar'}),
- call({'foo': 'bar', 'baz': 'qux'}),
- ])
+ self.collection.save.assert_has_calls(
+ [
+ call({"foo": "bar"}),
+ call({"foo": "bar", "baz": "qux"}),
+ ]
+ )
def test_count(self):
- with patch.object(self.collection, 'find'):
+ with patch.object(self.collection, "find"):
self.collection.find.return_value = list(range(10))
assert self.collection.count() == 10
def test_distinct(self):
- docs = [
- {'foo': 'bar'},
- {'foo': 'baz'},
- {'foo': 10},
- {'bar': 'foo'}
- ]
+ docs = [{"foo": "bar"}, {"foo": "baz"}, {"foo": 10}, {"bar": "foo"}]
self.collection.find = lambda: docs
-
- assert set(('bar', 'baz', 10)) == self.collection.distinct('foo')
+ assert set(("bar", "baz", 10)) == self.collection.distinct("foo")
def test_rename_raises_for_collision(self):
- nosqlite.Collection(self.db, 'bar') # Create a collision point
+ nosqlite.Collection(self.db, "bar") # Create a collision point
self.collection.create()
-
with raises(AssertionError):
- self.collection.rename('bar')
+ self.collection.rename("bar")
def test_rename(self):
self.collection.create()
assert self.collection.exists()
-
- self.collection.rename('bar')
- assert self.collection.name == 'bar'
+ self.collection.rename("bar")
+ assert self.collection.name == "bar"
assert self.collection.exists()
-
- assert not nosqlite.Collection(self.db, 'foo', create=False).exists()
+ assert not nosqlite.Collection(self.db, "foo", create=False).exists()
class TestFindOne:
-
- def test_returns_None_if_collection_does_not_exist(self, collection):
+ def test_returns_None_if_collection_does_not_exist(
+ self, collection: nosqlite.Collection
+ ):
assert collection.find_one({}) is None
- def test_returns_None_if_document_is_not_found(self, collection):
+ def test_returns_None_if_document_is_not_found(
+ self, collection: nosqlite.Collection
+ ):
collection.create()
assert collection.find_one({}) is None