Source code for ewoksutils.sqlite3_utils

from numbers import Integral, Real
import json
from datetime import datetime
from typing import Any, Dict, Iterator, Union, Optional, Generator
import sqlite3
from contextlib import closing
from contextlib import contextmanager

from .datetime_utils import fromisoformat


[docs] def ensure_table_query(table: str, field_sql_types: Dict[str, str]) -> str: s = f"CREATE TABLE IF NOT EXISTS {table}" if not field_sql_types: return s lst = [f"{k} {v}" for k, v in field_sql_types.items()] columns = ", ".join(lst) return f"{s} ({columns})"
[docs] def insert_query(table: str, nfields: int): values = ("?," * nfields)[:-1] return f"INSERT INTO {table} VALUES({values})"
[docs] def python_to_sql_type(value: Any) -> str: if isinstance(value, (Integral, bool)): return "INTEGER" elif isinstance(value, Real): return "REAL" elif isinstance(value, (str, datetime)): return "TEXT" else: return "BLOB"
[docs] def python_to_sql_types(field_types: Optional[Dict]) -> dict: if not field_types: return dict() return {k: python_to_sql_type(v) for k, v in field_types.items()}
[docs] def serialize(value: Any, sql_type: Optional[str] = None): if value is not None and sql_type is not None: vsql_type = python_to_sql_type(value) if sql_type != vsql_type: raise TypeError(f"value {value} does not have SQL type {sql_type}") if isinstance(value, (Integral, Real, bool, str)): return value elif isinstance(value, datetime): return value.isoformat() else: return json.dumps(value).encode()
def _select_serialize(value: Any, sql_type: Optional[str] = None): sql_value = serialize(value, sql_type) if isinstance(sql_value, str): return f"'{sql_value}'" return sql_value
[docs] def deserialize(sql_value, field_type: Optional[str] = None): if isinstance(sql_value, bytes): sql_value = sql_value.decode() if sql_value == "null" or sql_value is None: return None elif isinstance(field_type, bool): return bool(sql_value) elif isinstance(field_type, (Integral, Real, str)): return sql_value elif isinstance(field_type, datetime): return fromisoformat(sql_value) else: return json.loads(sql_value)
[docs] def select( conn, table: str, field_types: Optional[Dict] = None, sql_types: Optional[Dict] = None, starttime: Optional[Union[str, datetime]] = None, endtime: Optional[Union[str, datetime]] = None, **is_equal_filter, ) -> Iterator[dict]: if is_equal_filter: if sql_types is None: sql_types = python_to_sql_types(field_types) conditions = [ f"{k} = {_select_serialize(v, sql_types.get(k))}" for k, v in is_equal_filter.items() ] else: conditions = list() if starttime: if isinstance(starttime, str): starttime = fromisoformat(starttime) conditions.append(f"time >= '{starttime.isoformat()}'") if endtime: if isinstance(endtime, str): endtime = fromisoformat(endtime) conditions.append(f"time <= '{endtime.isoformat()}'") if conditions: search_condition = " AND ".join(conditions) query = f"SELECT * FROM {table} WHERE {search_condition}" else: query = f"SELECT * FROM {table}" with closing(conn.cursor()) as cursor: try: cursor.execute(query) except sqlite3.OperationalError as e: if "no such table" in str(e): return raise # Re-raise other errors rows = cursor.fetchall() conn.commit() if cursor.description is None: return fields = [col[0] for col in cursor.description] if field_types is None: field_types = {} for values in rows: yield { k: deserialize(v, field_types.get(k)) for k, v in zip(fields, values) }
[docs] @contextmanager def connect(*args, **kwargs) -> Generator[sqlite3.Connection, None, None]: with closing(sqlite3.connect(*args, **kwargs)) as conn: yield conn