import json
import sqlite3
from contextlib import closing
from contextlib import contextmanager
from datetime import datetime
from numbers import Integral
from numbers import Real
from typing import Any
from typing import Dict
from typing import Generator
from typing import Iterator
from typing import Optional
from typing import Union
from .datetime_utils import fromisoformat
[docs]
def ensure_table_query(table: str, field_sql_types: Dict[str, str]) -> str:
s = f"CREATE TABLE IF NOT EXISTS {table}"
if not field_sql_types:
return s
lst = [f"{k} {v}" for k, v in field_sql_types.items()]
columns = ", ".join(lst)
return f"{s} ({columns})"
[docs]
def insert_query(table: str, nfields: int):
values = ("?," * nfields)[:-1]
return f"INSERT INTO {table} VALUES({values})"
[docs]
def python_to_sql_type(value: Any) -> str:
if isinstance(value, (Integral, bool)):
return "INTEGER"
elif isinstance(value, Real):
return "REAL"
elif isinstance(value, (str, datetime)):
return "TEXT"
else:
return "BLOB"
[docs]
def python_to_sql_types(field_types: Optional[Dict]) -> dict:
if not field_types:
return dict()
return {k: python_to_sql_type(v) for k, v in field_types.items()}
[docs]
def serialize(value: Any, sql_type: Optional[str] = None):
if value is not None and sql_type is not None:
vsql_type = python_to_sql_type(value)
if sql_type != vsql_type:
raise TypeError(f"value {value} does not have SQL type {sql_type}")
if isinstance(value, (Integral, Real, bool, str)):
return value
elif isinstance(value, datetime):
return value.isoformat()
else:
return json.dumps(value).encode()
def _select_serialize(value: Any, sql_type: Optional[str] = None):
sql_value = serialize(value, sql_type)
if isinstance(sql_value, str):
return f"'{sql_value}'"
return sql_value
[docs]
def deserialize(sql_value, field_type: Optional[str] = None):
if isinstance(sql_value, bytes):
sql_value = sql_value.decode()
if sql_value == "null" or sql_value is None:
return None
elif isinstance(field_type, bool):
return bool(sql_value)
elif isinstance(field_type, (Integral, Real, str)):
return sql_value
elif isinstance(field_type, datetime):
return fromisoformat(sql_value)
else:
return json.loads(sql_value)
[docs]
def select(
conn,
table: str,
field_types: Optional[Dict] = None,
sql_types: Optional[Dict] = None,
starttime: Optional[Union[str, datetime]] = None,
endtime: Optional[Union[str, datetime]] = None,
**is_equal_filter,
) -> Iterator[dict]:
if is_equal_filter:
if sql_types is None:
sql_types = python_to_sql_types(field_types)
conditions = [
f"{k} = {_select_serialize(v, sql_types.get(k))}"
for k, v in is_equal_filter.items()
]
else:
conditions = list()
if starttime:
if isinstance(starttime, str):
starttime = fromisoformat(starttime)
conditions.append(f"time >= '{starttime.isoformat()}'")
if endtime:
if isinstance(endtime, str):
endtime = fromisoformat(endtime)
conditions.append(f"time <= '{endtime.isoformat()}'")
if conditions:
search_condition = " AND ".join(conditions)
query = f"SELECT * FROM {table} WHERE {search_condition}"
else:
query = f"SELECT * FROM {table}"
with closing(conn.cursor()) as cursor:
try:
cursor.execute(query)
except sqlite3.OperationalError as e:
if "no such table" in str(e):
return
raise # Re-raise other errors
rows = cursor.fetchall()
conn.commit()
if cursor.description is None:
return
fields = [col[0] for col in cursor.description]
if field_types is None:
field_types = {}
for values in rows:
yield {
k: deserialize(v, field_types.get(k)) for k, v in zip(fields, values)
}
[docs]
@contextmanager
def connect(*args, **kwargs) -> Generator[sqlite3.Connection, None, None]:
with closing(sqlite3.connect(*args, **kwargs)) as conn:
yield conn