package documentation
DBXS (“D.B. Access”) is an SQL database access layer for Python.
| Package | adapters |
No package docstring; 2/4 modules documented |
| Module | async |
This is a collection of abstract types that are like the PEP 249 types in dbxs.dbapi, but with await put in the relevant places to make them asynchronous. |
| Module | dbapi |
This module is a description of the abstract types described by PEP 249 Database API 2.0. |
| Module | testing |
Testing support for dbxs. |
| Module | __main__ |
Undocumented |
| Module | _access |
No module docstring; 0/1 variable, 0/3 type variable, 0/4 constant, 1/2 function, 1/1 exception, 3/9 classes documented |
| Module | _enumerate |
Undocumented |
| Module | _repository |
A repository combines a collection of accessors. |
| Module | _testing |
No module docstring; 0/3 type alias, 0/1 type variable, 1/1 function, 0/4 class documented |
| Module | _typing |
Since we support a range of Python and Mypy versions where certain features are available across typing and the typing_extensions fallback, we put those aliases here to avoid repeating conditional import logic. |
From __init__.py:
| Exception | |
An access pattern defined extraneous methods. |
| Exception | |
An assumption about the number of rows from a given query was violated; there were either too many or too few. |
| Exception | |
There were not enough results for the query to satify one. |
| Exception | |
The parameters required by the query are different than the parameters specified by the function. |
| Exception | |
There were more results for a query than expected; more than one for one, or any at all for zero. |
| Function | accessor |
Create a factory which binds a database transaction in the form of an AsyncConnection to a set of declared SQL methods. |
| Function | many |
Fetch multiple results with a function to translate rows. |
| Function | maybe |
Fetch a single result and pass it to a translator function, but return None if it's not found. |
| Function | one |
Fetch a single result with a translator function. |
| Function | query |
Declare a query method (i.e. a DQL statement). |
| Function | repository |
A repository combines management of a transaction with management of a "repository", which is a collection of accessors and a contextmanager that manages a transaction. This is easier to show with an example than a description:... |
| Function | statement |
Declare a data-modification (DML) method. |
| Variable | __version__ |
Undocumented |
def accessor(accessPatternProtocol:
Callable[ [], T], prevalidationStyle: ParamStyle | Dialect = 'qmark') -> Callable[ [ AsyncConnection], T]:
(source)
¶
Create a factory which binds a database transaction in the form of an AsyncConnection to a set of declared SQL methods.
def maybe(load:
Callable[ ..., T]) -> Callable[ [ object, AsyncCursor], Coroutine[ object, object, T | None]]:
(source)
¶
Fetch a single result and pass it to a translator function, but return None if it's not found.
def one(load:
Callable[ ..., T]) -> Callable[ [ object, AsyncCursor], Coroutine[ object, object, T]]:
(source)
¶
Fetch a single result with a translator function.
def query(*, sql:
str | ReturnsRows, load: Callable[ [ AccessProxy, AsyncCursor], A]) -> Callable[ [ Callable[ P, A]], Callable[ P, A]]:
(source)
¶
Declare a query method (i.e. a DQL statement).
def repository(repositoryType:
type[ T]) -> Callable[ [ AsyncConnectable], AsyncContextManager[ T]]:
(source)
¶
A repository combines management of a transaction with management of a "repository", which is a collection of accessors and a contextmanager that manages a transaction. This is easier to show with an example than a description:
class Users(Protocol):
@query(sql="...", load=one(User))
def getUserByID(self, id: UserID) -> User: ...
class Posts(Protocol):
@query(sql="...", load=many(Post))
def getPostsFromUser(self, id: UserID) -> AsyncIterator[Posts]: ...
@dataclass
class BlogDB:
users: Users
posts: Posts
blogRepository = repository(BlogDB)
# ...
async def userAndPosts(pool: AsyncConnectable, id: UserID) -> str:
async with blogRepository(pool) as blog:
user = await blog.users.getUserByID(id)
posts = await blog.posts.getPostsFromUser(posts)
# transaction commits here