# Copyright 2017 Bloomberg Finance L.P.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""This module provides a DB-API 2.0 compatible Comdb2 API.
Overview
========
This module provides a Comdb2 interface that conforms to `the Python Database
API Specification v2.0 <https://www.python.org/dev/peps/pep-0249/>`_.
Basic Usage
-----------
The main class used for interacting with a Comdb2 is `Connection`, which you
create by calling the `connect` factory function. Any errors that are
encountered when connecting to or querying the database are raised as instances
of the `Error` class.
A basic usage example looks like this::
from comdb2 import dbapi2
conn = dbapi2.connect('mattdb', autocommit=True)
cursor = conn.cursor()
cursor.execute("select 1, 'a' union all select 2, 'b'")
for row in cursor.fetchall():
print(row)
The above would result in the following output::
[1, 'a']
[2, 'b']
To reduce the amount of boilerplate required for fetching result sets, we
implement 2 extensions to the interface required by the Python DB-API: `Cursor`
objects are iterable, yielding one row of the result set per iteration, and
`Cursor.execute` returns the `Cursor` itself. By utilizing these extensions,
the basic example can be shortened to::
from comdb2 import dbapi2
conn = dbapi2.connect('mattdb', autocommit=True)
for row in conn.cursor().execute("select 1, 'a' union all select 2, 'b'"):
print(row)
Graceful Teardown and Error Handling
------------------------------------
Non-trivial applications should guarantee that the `Connection` is closed when
it is no longer needed, preferably by using `contextlib.closing`. They should
also be prepared to handle any errors returned by the database. So, a more
thorough version of the example above would be::
from comdb2 import dbapi2
from contextlib import closing
try:
with closing(dbapi2.connect('mattdb', autocommit=True)) as conn:
query = "select 1, 'a' union all select 2, 'b'"
for row in conn.cursor().execute(query):
print(row)
except dbapi2.Error as exc:
print("Comdb2 exception encountered: %s" % exc)
In this example, `contextlib.closing` is used to guarantee that
`Connection.close` is called at the end of the ``with`` block, and an exception
handler been added for exceptions of type `Error`. All exceptions raised by
this module are subclasses of `Error`. See :ref:`Exceptions` for details on
when each exception type is raised.
Controlling the Type Used For Result Rows
-----------------------------------------
As you can see, rows are returned as a `list` of column values in positional
order. If you'd prefer to get the columns back as some other type, you can set
`Connection.row_factory` to one of the factories provided by
`comdb2.factories` - for example::
from comdb2 import dbapi2
from comdb2 import factories
conn = dbapi2.connect('mattdb', autocommit=True)
conn.row_factory = factories.dict_row_factory
c = conn.cursor()
for row in c.execute("select 1 as 'x', 2 as 'y' union all select 3, 4"):
print(row)
This program will return each row as a `dict` rather than a `list`::
{'y': 2, 'x': 1}
{'y': 4, 'x': 3}
Parameter Binding
-----------------
In real applications you'll often need to pass parameters into a SQL query.
This is done using parameter binding, either by name or by position.
By Name
~~~~~~~
In the query, placeholders are specified using ``%(name)s``, and a mapping of
``name`` to parameter value is passed to `Cursor.execute` along with the query.
The ``%(`` and ``)s`` are fixed, and the ``name`` inside them varies for each
parameter. For example:
>>> query = "select 25 between %(a)s and %(b)s"
>>> print(conn.cursor().execute(query, {'a': 20, 'b': 42}).fetchall())
[[1]]
>>> params = {'a': 20, 'b': 23}
>>> print(conn.cursor().execute(query, params).fetchall())
[[0]]
In this example, we run the query with two different sets of
parameters, producing different results. First, we execute the query with
parameter ``a`` bound to ``20`` and ``b`` bound to ``42``. In this case,
because ``20 <= 25 <= 42``, the expression evaluates to true, and a row
containing a single column with a value of ``1`` is returned.
When we run the same query with parameter ``b`` bound to ``23``, a row
containing a single column with a value of ``0`` is returned instead, because
``20 <= 25 <= 23`` is false.
Warning:
Because named parameters are bound using ``%(name)s``, other ``%`` signs in
a query must be escaped. For example, ``WHERE name like 'M%'`` becomes
``WHERE name LIKE 'M%%'``.
Danger:
This applies even when no parameters are passed at all - passing ``None``
or no parameters behaves the same as passing an empty `dict`.
By Position
~~~~~~~~~~~
You can bind parameters positionally rather than by name, by using ``?``
for each placeholder and providing a list or tuple of parameter values.
For example:
>>> query = "select 25 between ? and ?"
>>> print(conn.cursor().execute(query, [20, 42]).fetchall())
[[1]]
In this example, we execute the query with the first ``?`` bound to 20 and the
second ``?`` bound to 42, so a row with a single column with a value of ``1``
is returned like in the previous example.
Warning:
Unlike when binding parameters by name, you must not escape ``%`` signs in
the SQL when binding parameters positionally.
For example, you could do ``WHERE val % 5 = ?``, but not ``WHERE val %% 5 = ?``.
Tip:
You can pass an empty tuple of parameters to avoid the need to escape ``%``
signs in the SQL even when you don't want to bind any parameters, like:
>>> query = "select 42 % 20"
>>> print(conn.cursor().execute(query, ()).fetchall())
[[2]]
Compare this against what happens if you don't pass any parameters at all:
>>> print(conn.cursor().execute(query).fetchall())
Traceback (most recent call last):
...
comdb2.dbapi2.InterfaceError: Invalid Python format string for query
Types
-----
For all Comdb2 types, the same Python type is used for binding a parameter
value as is returned for a SQL query result column of that type. In brief, SQL
types are mapped to Python types according to the following table:
============ ================================================================
SQL type Python type
============ ================================================================
NULL ``None``
integer `int`
real `float`
blob `bytes`
text `str`
datetime `datetime.datetime`
datetimeus `DatetimeUs`
============ ================================================================
See :ref:`Comdb2 to Python Type Mappings` for a thorough explanation of these
type mappings and their implications.
Note:
This module uses byte strings to represent BLOB columns, and Unicode
strings to represent TEXT columns. This is a very common source of
problems for new users.
Make sure to carefully read :ref:`String and Blob Types` on that page.
.. _Autocommit Mode:
Autocommit Mode
---------------
In all of the examples above, we gave the ``autocommit=True`` keyword argument
when calling `connect`. This opts out of DB-API compliant transaction
handling, in order to use Comdb2's native transaction semantics.
By default, DB-API cursors are always in a transaction. You can commit that
transaction using `Connection.commit`, or roll it back using
`Connection.rollback`. For example::
conn = dbapi2.connect('mattdb')
cursor = conn.cursor()
query = "insert into simple(key, val) values (%(key)s, %(val)s)"
cursor.execute(query, {'key': 1, 'val': 2})
cursor.execute(query, {'key': 3, 'val': 4})
cursor.execute(query, {'key': 5, 'val': 6})
conn.commit()
There are several things to note here. The first is that the insert statements
that were sent to the database don't take effect immediately, because they are
implicitly part of a transaction that must be explicitly completed. This is
different from other Comdb2 APIs, where you must execute a ``BEGIN`` statement
to start a transaction, and where statements otherwise take effect immediately.
The second thing to note is that there are certain error conditions where
a Comdb2 connection can automatically recover when outside of a transaction,
but not within a transaction. In other words, using transactions when they
aren't needed can introduce new failure modes into your program.
In order to provide greater compatibility with other Comdb2 interfaces and
to eliminate the performance costs and extra error cases introduced by using
transactions unnecessarily, we allow you to pass the non-standard
``autocommit=True`` keyword argument when creating a new `Connection`. This
results in the implicit transaction not being created. You can still start
a transaction explicitly by passing a ``BEGIN`` statement to
`Cursor.execute`. For example::
conn = dbapi2.connect('mattdb', autocommit=True)
cursor = conn.cursor()
cursor.execute("delete from simple where 1=1")
cursor.execute("begin")
query = "insert into simple(key, val) values (%(key)s, %(val)s)"
cursor.execute(query, {'key': 1, 'val': 2})
cursor.execute(query, {'key': 3, 'val': 4})
cursor.execute(query, {'key': 5, 'val': 6})
cursor.execute("rollback")
In this example, because we've used ``autocommit=True`` the delete statement
takes effect immediately (that is, it is automatically committed). We then
explicitly create a transaction, insert 3 rows, then decide not to commit
it, and instead explicitly roll back the transaction.
To summarize: you cannot use ``autocommit`` mode if you intend to pass the
`Connection` to a library that requires DB-API compliant connections. You
should prefer ``autocommit`` mode when you don't want to use transactions (for
example, read-only queries where no particular consistency guarantees are
required across queries). If you do intend to use transactions but won't pass
the `Connection` to a library that requires DB-API compliance, you can choose
either mode. It may be easier to port existing code if you use ``autocommit``
mode, but avoiding ``autocommit`` mode may allow you to use 3rd party libraries
in the future that require DB-API compliant connections.
DB-API Compliance
-----------------
The interface this module provides fully conforms to `the Python Database API
Specification v2.0 <https://www.python.org/dev/peps/pep-0249/>`_ with a few
specific exceptions:
1. DB-API requires ``Date`` and ``DateFromTicks`` constructors, which we don't
provide because Comdb2 has no type for representing a date without a time
component.
2. DB-API requires ``Time`` and ``TimeFromTicks`` constructors, which we don't
provide because Comdb2 has no type for representing a time without a date
component.
3. DB-API is unclear about the required behavior of multiple calls to
`Connection.cursor` on a connection. Comdb2 does not have a concept of
cursors as distinct from connection handles, so each time
`Connection.cursor` is called, we call `Cursor.close` on any existing, open
cursor for that connection.
"""
from __future__ import annotations
import functools
import itertools
import weakref
import datetime
import re
from . import cdb2
from .cdb2 import ColumnType, Row, Value, ParameterValue
from collections.abc import Callable, Iterator, Mapping, Sequence
from typing import Any, List
__all__ = [
"apilevel",
"threadsafety",
"paramstyle",
"connect",
"ColumnType",
"Connection",
"Cursor",
"STRING",
"BINARY",
"NUMBER",
"DATETIME",
"ROWID",
"Datetime",
"DatetimeUs",
"Binary",
"Timestamp",
"TimestampUs",
"DatetimeFromTicks",
"DatetimeUsFromTicks",
"TimestampFromTicks",
"Error",
"Warning",
"InterfaceError",
"DatabaseError",
"InternalError",
"OperationalError",
"ProgrammingError",
"IntegrityError",
"DataError",
"NotSupportedError",
"UniqueKeyConstraintError",
"ForeignKeyConstraintError",
"NonNullConstraintError",
"Value",
"ParameterValue",
]
apilevel = "2.0"
"""This module conforms to the Python Database API Specification 2.0."""
threadsafety = 1
"""Two threads can use this module, but can't share one `Connection`."""
paramstyle = "pyformat"
"""The SQL placeholder format for this module is ``%(name)s``.
Comdb2's native placeholder format is ``@name``, but that cannot be used by
this module because it's not an acceptable `DB-API 2.0 placeholder style
<https://www.python.org/dev/peps/pep-0249/#paramstyle>`_. This module uses
``pyformat`` for named parameters because it is the only DB-API 2.0 paramstyle
that we can translate into Comdb2's placeholder format without needing
a SQL parser. This module also supports the ``qmark`` parameter style for
binding parameters positionally.
Note:
An int value is bound as ``%(my_int)s``, not as ``%(my_int)d`` - the last
character is always ``s``.
Note:
When binding parameters by name, any ``%`` sign is recognized as the start
of a ``pyformat`` style placeholder, and so any literal ``%`` characters in
a SQL statement must be escaped by doubling. ``WHERE name like 'M%'``
becomes ``WHERE name LIKE 'M%%'``. This does not apply when binding
parameters positionally with ``?`` placeholders, nor when the literal ``%``
appears in a parameter value as opposed to literally in the query.
In either of those cases, the ``%`` characters must not be escaped.
Warning:
Literal ``%`` signs in the query must be escaped when no parameters are
passed at all -- passing ``None`` or no parameters behaves the same as
passing an empty `dict`. You can avoid the need to escape ``%`` signs in
an unparametrized query by instead passing an empty `tuple` as parameters,
which causes the statement to be treated as having ``qmark`` placeholders
instead of ``pyformat`` placeholders.
"""
_FIRST_WORD_OF_STMT = re.compile(
r"""
(?: # match (without capturing)
\s* # optional whitespace
/\*.*?\*/ # then a C-style /* ... */ comment, possibly across lines
| # or
\s* # optional whitespace
--[^\n]*\n # then a SQL-style comment terminated by a newline
)* # repeat until all comments have been matched
\s* # then skip over any whitespace
(\w+) # and capture the first word
""",
re.VERBOSE | re.DOTALL | re.ASCII,
)
_VALID_SP_NAME = re.compile(r"^[A-Za-z0-9_.]+$")
@functools.total_ordering
class _TypeObject:
def __init__(self, *value_names):
self.value_names = value_names
self.values = [cdb2.TYPE[v] for v in value_names]
def __eq__(self, other):
return other in self.values
def __lt__(self, other):
return self != other and other < self.values
def __repr__(self):
return "TypeObject" + str(self.value_names)
def _binary(string: str | bytes) -> bytes:
if isinstance(string, str):
return string.encode("utf-8")
return bytes(string)
STRING = _TypeObject("CSTRING")
"""The type codes of TEXT result columns compare equal to this constant."""
BINARY = _TypeObject("BLOB")
"""The type codes of BLOB result columns compare equal to this constant."""
NUMBER = _TypeObject("INTEGER", "REAL")
"""The type codes of numeric result columns compare equal to this constant."""
DATETIME = _TypeObject("DATETIME", "DATETIMEUS")
"""The type codes of datetime result columns compare equal to this constant."""
ROWID = STRING
# comdb2 doesn't support Date or Time, so I'm not defining them.
Datetime = datetime.datetime
DatetimeUs = cdb2.DatetimeUs
Binary = _binary
Timestamp = Datetime
TimestampUs = DatetimeUs
DatetimeFromTicks = Datetime.fromtimestamp
DatetimeUsFromTicks = DatetimeUs.fromtimestamp
TimestampFromTicks = Timestamp.fromtimestamp
TimestampUsFromTicks = TimestampUs.fromtimestamp
UserException = Exception
[docs]
class Error(UserException):
"""This is the base class of all exceptions raised by this module.
In addition to being available at the module scope, this class and the
other exception classes derived from it are exposed as attributes on
`Connection` objects, to simplify error handling in environments where
multiple connections from different modules are used.
"""
pass
[docs]
class Warning(UserException):
"""Exception raised for important warnings.
This is required to exist by the DB-API interface, but we never raise it.
"""
pass
[docs]
class InterfaceError(Error):
"""Exception raised for errors caused by misuse of this module."""
pass
[docs]
class DatabaseError(Error):
"""Base class for all errors reported by the database."""
pass
[docs]
class InternalError(DatabaseError):
"""Exception raised for internal errors reported by the database."""
pass
[docs]
class OperationalError(DatabaseError):
"""Exception raised for errors related to the database's operation.
These errors are not necessarily the result of a bug either in the
application or in the database - for example, dropped connections.
"""
pass
[docs]
class ProgrammingError(DatabaseError):
"""Exception raised for programming errors reported by the database.
For example, this will be raised for syntactically incorrect SQL, or for
passing a different number of parameters than are required by the query.
"""
pass
[docs]
class IntegrityError(DatabaseError):
"""Exception raised for integrity errors reported by the database.
For example, a subclass of this will be raised if a foreign key constraint
would be violated, or a constraint that a column may not be null, or that
an index may not have duplicates. Other types of constraint violations may
raise this type directly.
"""
pass
[docs]
class UniqueKeyConstraintError(IntegrityError):
"""Exception raised when a unique key constraint would be broken.
Committing after either an INSERT or an UPDATE could result in this being
raised, by either adding a new row that violates a unique (non-dup) key
constraint or modifying an existing row in a way that would violate one.
.. versionadded:: 1.1
"""
pass
[docs]
class ForeignKeyConstraintError(IntegrityError):
"""Exception raised when a foreign key constraint would be broken.
This would be raised when committing if the changes being committed would
violate referential integrity according to a foreign key constraint
configured on the database. For instance, deleting a row from a parent
table would raise this if rows corresponding to its key still exist in
a child table and the constraint doesn't have ON DELETE CASCADE. Likewise,
inserting a row into a child table would raise this if there was no row in
the parent table matching the new row's key.
.. versionadded:: 1.1
"""
pass
[docs]
class NonNullConstraintError(IntegrityError):
"""Exception raised when a non-null constraint would be broken.
Committing after either an INSERT or an UPDATE could result in this being
raised if it would result in a null being stored in a non-nullable column.
Note that columns in a Comdb2 are not nullable by default.
.. versionadded:: 1.1
"""
pass
[docs]
class DataError(DatabaseError):
"""Exception raised for errors related to the processed data.
For example, this will be raised if you attempt to write a value that's out
of range for the column type that would store it, or if you specify an
invalid timezone name for the connection.
"""
pass
[docs]
class NotSupportedError(DatabaseError):
"""Exception raised when unsupported operations are attempted."""
pass
_EXCEPTION_BY_RC = {
cdb2.ERROR_CODE["CONNECT_ERROR"]: OperationalError,
cdb2.ERROR_CODE["NOTCONNECTED"]: ProgrammingError,
cdb2.ERROR_CODE["PREPARE_ERROR"]: ProgrammingError,
cdb2.ERROR_CODE["IO_ERROR"]: OperationalError,
cdb2.ERROR_CODE["INTERNAL"]: InternalError,
cdb2.ERROR_CODE["NOSTATEMENT"]: ProgrammingError,
cdb2.ERROR_CODE["BADCOLUMN"]: ProgrammingError,
cdb2.ERROR_CODE["BADSTATE"]: ProgrammingError,
cdb2.ERROR_CODE["ASYNCERR"]: OperationalError,
cdb2.ERROR_CODE["INVALID_ID"]: InternalError,
cdb2.ERROR_CODE["RECORD_OUT_OF_RANGE"]: OperationalError,
cdb2.ERROR_CODE["REJECTED"]: OperationalError,
cdb2.ERROR_CODE["STOPPED"]: OperationalError,
cdb2.ERROR_CODE["BADREQ"]: OperationalError,
cdb2.ERROR_CODE["DBCREATE_FAILED"]: OperationalError,
cdb2.ERROR_CODE["THREADPOOL_INTERNAL"]: OperationalError,
cdb2.ERROR_CODE["READONLY"]: NotSupportedError,
cdb2.ERROR_CODE["NOMASTER"]: InternalError,
cdb2.ERROR_CODE["UNTAGGED_DATABASE"]: NotSupportedError,
cdb2.ERROR_CODE["CONSTRAINTS"]: IntegrityError,
cdb2.ERROR_CODE["DEADLOCK"]: OperationalError,
cdb2.ERROR_CODE["TRAN_IO_ERROR"]: OperationalError,
cdb2.ERROR_CODE["ACCESS"]: OperationalError,
cdb2.ERROR_CODE["TRAN_MODE_UNSUPPORTED"]: NotSupportedError,
cdb2.ERROR_CODE["VERIFY_ERROR"]: OperationalError,
cdb2.ERROR_CODE["FKEY_VIOLATION"]: ForeignKeyConstraintError,
cdb2.ERROR_CODE["NULL_CONSTRAINT"]: NonNullConstraintError,
cdb2.ERROR_CODE["CONV_FAIL"]: DataError,
cdb2.ERROR_CODE["NONKLESS"]: NotSupportedError,
cdb2.ERROR_CODE["MALLOC"]: OperationalError,
cdb2.ERROR_CODE["NOTSUPPORTED"]: NotSupportedError,
cdb2.ERROR_CODE["DUPLICATE"]: UniqueKeyConstraintError,
cdb2.ERROR_CODE["TZNAME_FAIL"]: DataError,
cdb2.ERROR_CODE["UNKNOWN"]: OperationalError,
}
def _raise_wrapped_exception(exc):
code = exc.error_code
msg = "%s (cdb2api rc %d)" % (exc.error_message, code)
if "null constraint violation" in msg:
raise NonNullConstraintError(msg) from exc # DRQS 86013831
raise _EXCEPTION_BY_RC.get(code, OperationalError)(msg) from exc
def _sql_operation(sql):
match = _FIRST_WORD_OF_STMT.match(sql)
if match:
return match.group(1).lower()
return None
def _operation_ends_transaction(operation):
return operation == "commit" or operation == "rollback"
def _modifies_rows(operation):
# These operations can modify the contents of the database.
# exec is deliberately excluded because it might return a result set, and
# this function is used to determine whether it's safe to call
# cdb2_get_effects after running the operation.
return operation in ("commit", "insert", "update", "delete")
[docs]
def connect(
database_name: str | bytes,
tier: str | bytes = "default",
autocommit: bool = False,
host: str | bytes | None = None,
) -> Connection:
"""Establish a connection to a Comdb2 database.
All arguments are passed directly through to the `Connection` constructor.
Note:
DB-API 2.0 requires the module to expose `connect`, but not
`Connection`. If portability across database modules is a concern, you
should always use `connect` to create your connections rather than
calling the `Connection` constructor directly.
Returns:
Connection: A handle for the newly established connection.
"""
return Connection(
database_name=database_name,
tier=tier,
autocommit=autocommit,
host=host,
)
[docs]
class Connection:
"""Represents a connection to a Comdb2 database.
By default, the connection will be made to the cluster configured as the
machine-wide default for the given database. This is almost always what
you want. If you need to connect to a database that's running on your
local machine rather than a cluster, you can pass "local" as the ``tier``.
It's also permitted to specify "dev", "alpha", "beta", or "prod" as the
``tier``, which will connect you directly to the tier you specify (firewall
permitting).
Alternately, you can pass a machine name as the ``host`` argument, to
connect directly to an instance of the given database on that host, rather
than on a cluster or the local machine.
The connection will use UTC as its timezone by default - you can change
this with a ``SET TIMEZONE`` statement if needed.
By default, or if ``autocommit`` is ``False``, `cursor` will return cursors
that behave as mandated by the Python DB API: every statement to be
executed is implicitly considered to be part of a transaction, and that
transaction must be ended explicitly with a call to `commit` (or
`rollback`). If ``autocommit`` is ``True``, `cursor` will instead return
cursors that behave more in line with Comdb2's traditional behavior: the
side effects of any given statement are immediately committed unless you
previously started a transaction by executing a ``begin`` statement.
Note:
Using ``autocommit=True`` will ease porting from code using other
Comdb2 APIs, both because other Comdb2 APIs implicitly commit after
each statement in the same way as an autocommit `Connection` will, and
because there are certain operations that Comdb2 will implicitly
retry when outside of a transaction but won't retry when inside
a transaction - meaning that a non-autocommit `Connection` has extra
failure modes. You should strongly consider using ``autocommit=True``,
especially for read-only use cases.
Note:
Python does not guarantee that object finalizers will be called when
the interpreter exits, so to ensure that the connection is cleanly
released you should call the `close` method when you're done with it.
You can use `contextlib.closing` to guarantee the connection is
released when a block completes.
Note:
DB-API 2.0 requires the module to expose `connect`, but not
`Connection`. If portability across database modules is a concern, you
should always use `connect` to create your connections rather than
instantiating this class directly.
Args:
database_name (str): The name of the database to connect to.
tier (str): The cluster to connect to.
host (str): Alternately, a single remote host to connect to.
autocommit (bool): Whether to automatically commit after DML
statements, disabling DB-API 2.0's automatic implicit transactions.
"""
def __init__(
self,
database_name: str | bytes,
tier: str | bytes = "default",
autocommit: bool = False,
host: str | bytes | None = None,
) -> None:
if host is not None and tier != "default":
raise InterfaceError(
"Connecting to a host by name and to a "
"cluster by tier are mutually exclusive"
)
self._active_cursor = None
self._in_transaction = False
self._autocommit = autocommit
try:
self._hndl = cdb2.Handle(database_name, tier, host=host)
except cdb2.Error as e:
_raise_wrapped_exception(e)
def _check_closed(self):
if self._hndl is None:
raise InterfaceError("Attempted to use a closed Connection")
@property
def row_factory(self) -> Callable[[list[str]], Callable[[list[Value]], Row]]:
"""Factory used when constructing result rows.
By default, or when set to ``None``, each row is returned as a `list`
of column values. If you'd prefer to receive rows as a `dict` or as
a `collections.namedtuple`, you can set this property to one of the
factories provided by the `comdb2.factories` module.
Example:
>>> from comdb2.factories import dict_row_factory
>>> conn.row_factory = dict_row_factory
>>> cursor = conn.cursor()
>>> cursor.execute("SELECT 1 as 'foo', 2 as 'bar'")
>>> cursor.fetchone()
{'foo': 1, 'bar': 2}
.. versionadded:: 0.9
"""
self._check_closed()
return self._hndl.row_factory
@row_factory.setter
def row_factory(
self, value: Callable[[list[str]], Callable[[list[Value]], Row]]
) -> None:
self._check_closed()
self._hndl.row_factory = value
def _close_any_outstanding_cursor(self):
if self._active_cursor is not None:
cursor = self._active_cursor()
if cursor is not None and not cursor._closed:
cursor.close()
def _execute(self, operation):
cursor = None
if self._active_cursor is not None:
cursor = self._active_cursor()
if cursor is None:
cursor = self.cursor()
cursor._execute(operation, operation)
[docs]
def close(self, ack_current_event: bool = True) -> None:
"""Gracefully close the Comdb2 connection.
Once a `Connection` has been closed, no further operations may be
performed on it.
If the connection was used to consume events from a `Lua consumer`__,
then *ack_current_event* tells the database what to do with the
last event that was delivered. By default it will be marked as consumed
and won't be redelivered, but if ``ack_current_event=False`` then
the event will be redelivered to another consumer for processing.
__ https://bloomberg.github.io/comdb2/triggers.html#lua-consumers
If a socket pool is running on the machine and the connection was in
a clean state, this will turn over the connection to the socket pool.
This cannot be done if the connection was in a transaction or
in the middle of retrieving a result set. Other restrictions may apply
as well.
You can ensure that this gets called at the end of a block using
something like:
>>> with contextlib.closing(connect('mattdb')) as conn:
>>> for row in conn.cursor().execute("select 1"):
>>> print(row)
"""
if self._hndl is None:
raise InterfaceError("close() called on already closed connection")
self._close_any_outstanding_cursor()
self._hndl.close(ack_current_event=ack_current_event)
self._hndl = None
[docs]
def commit(self) -> None:
"""Commit any pending transaction to the database.
This method will fail if the `Connection` is in ``autocommit`` mode and
no transaction was explicitly started.
"""
self._check_closed()
self._execute("commit")
[docs]
def rollback(self) -> None:
"""Rollback the current transaction.
This method will fail if the `Connection` is in ``autocommit`` mode and
no transaction was explicitly started.
Note:
Closing a connection without committing the changes first will
cause an implicit rollback to be performed, but will also prevent
that connection from being contributed to the socket pool, if one
is available. Because of this, an explicit rollback should be
preferred when possible.
"""
self._check_closed()
self._execute("rollback")
[docs]
def cursor(self) -> Cursor:
"""Return a new `Cursor` for this connection.
This calls `Cursor.close` on any outstanding `Cursor`; only one
`Cursor` is allowed per `Connection` at a time.
Note:
Although outstanding cursors are closed, uncommitted transactions
started by them are not rolled back, so the new `Cursor` will begin
in the middle of that uncommitted transaction.
Returns:
Cursor: A new cursor on this connection.
"""
self._check_closed()
self._close_any_outstanding_cursor()
cursor = Cursor(self)
self._active_cursor = weakref.ref(cursor)
return cursor
# Optional DB API Extension
Error = Error
Warning = Warning
InterfaceError = InterfaceError
DatabaseError = DatabaseError
InternalError = InternalError
OperationalError = OperationalError
ProgrammingError = ProgrammingError
IntegrityError = IntegrityError
DataError = DataError
NotSupportedError = NotSupportedError
[docs]
class Cursor:
"""Class used to send requests through a database connection.
This class is not meant to be instantiated directly; it should always be
created using `Connection.cursor`. It provides methods for sending
requests to the database and for reading back the result sets produced by
the database.
Queries are made using the `execute` and `callproc` methods. Result sets
can be consumed with the `fetchone`, `fetchmany`, or `fetchall` methods, or
(as a nonstandard DB-API 2.0 extension) by iterating over the `Cursor`.
Note:
Only one `Cursor` per `Connection` can exist at a time. Creating a new
one will `close` the previous one.
"""
_ErrorMessagesByOperation = {
"begin": "Transactions may not be started explicitly",
"commit": "Use Connection.commit to commit transactions",
"rollback": "Use Connection.rollback to roll back transactions",
}
def __init__(self, conn: Connection) -> None:
self._arraysize = 1
self._conn = conn
self._hndl = conn._hndl
self._description = None
self._closed = False
self._rowcount = -1
def _check_closed(self):
if self._closed:
raise InterfaceError("Attempted to use a closed cursor")
@property
def arraysize(self) -> int:
"""Controls the number of rows to fetch at a time with `fetchmany`.
The default is ``1``, meaning that a single row will be fetched at
a time.
"""
return self._arraysize
@arraysize.setter
def arraysize(self, value: int) -> None:
self._arraysize = value
@property
def description(
self,
) -> tuple[tuple[str, object, None, None, None, None, None], ...] | None:
"""Provides the name and type of each column in the latest result set.
This read-only attribute will contain one element per column in the
result set. Each of those elements will be a 7-element sequence whose
first element is the name of that column, whose second element is
a type code for that column, and whose five remaining elements are
``None``.
The type codes can be compared for equality against the `STRING`,
`NUMBER`, `DATETIME`, and `BINARY` objects exposed by this module. If
you need more granularity (e.g. knowing whether the result is
a ``REAL`` or an ``INTEGER``) you can compare the type code for
equality against the members of the `.cdb2.TYPE` dictionary. Or, of
course, you can use `isinstance` to check the type of object returned
as that column's value.
Example:
>>> cursor = connect('mattdb').cursor()
>>> cursor.execute("select 1 as 'x', '2' as 'y', 3.0 as 'z'")
>>> cursor.description[0][:2] == ('x', NUMBER)
True
>>> cursor.description[1][:2] == ('y', STRING)
True
>>> cursor.description[2][:2] == ('z', NUMBER)
True
>>> cursor.description[2][:2] == ('z', TYPE['INTEGER'])
False
>>> cursor.description[2][:2] == ('z', TYPE['REAL'])
True
"""
self._check_closed()
return self._description
@property
def rowcount(self) -> int:
"""Provides the count of rows modified by the last transaction.
For `Cursor` objects on a `Connection` that is not using ``autocommit``
mode, this count is valid only after the transaction is committed with
`Connection.commit()`. For `Cursor` objects on a `Connection` that is
using ``autocommit`` mode, this count is valid after a successful
``COMMIT``, or after an ``INSERT``, ``UPDATE``, or ``DELETE`` statement
run outside of an explicit transaction. At all other times, ``-1`` is
returned.
In particular, ``-1`` is returned whenever a transaction is in
progress, because Comdb2 by default handles commit conflicts with other
transactions by rerunning each statement of the transaction. As
a result, row counts obtained within a transaction are meaningless in
the default transaction level; either more or fewer rows may be
affected when the transaction eventually commits.
Also, ``-1`` is returned after ``SELECT`` or ``SELECTV``, because
querying the row count requires calling ``cdb2_get_effects``, which
would consume the result set before the user could iterate over it.
Likewise, ``-1`` is returned after ``EXEC PROCEDURE``, because a stored
procedure could emit a result set.
"""
self._check_closed()
return self._rowcount
# Optional DB API Extension
@property
def connection(self) -> Connection:
"""Return a reference to the `Connection` that this `Cursor` uses."""
self._check_closed()
return self._conn
[docs]
def close(self) -> None:
"""Close the cursor now.
From this point forward an exception will be raised if any
operation is attempted with this `Cursor`.
Note:
This does not rollback any uncommitted operations executed by this
`Cursor`. A new `Cursor` created from the `Connection` that this
`Cursor` uses will start off in the middle of that uncommitted
transaction.
"""
self._check_closed()
self._description = None
self._closed = True
[docs]
def callproc(self, procname: str, parameters: Sequence[ParameterValue]) -> Sequence[ParameterValue]:
"""Call a stored procedure with the given name.
The ``parameters`` sequence must contain one entry for each argument
that the procedure requires.
If the called procedure emits a result set, it is made available
through the fetch methods, or by iterating over the `Cursor`, as though
it was returned by a ``select`` statement.
Args:
procname (str): The name of the stored procedure to be executed.
parameters (Sequence[Any]): A sequence of values to be passed, in
order, as the parameters to the stored procedure. Each element
must be an instance of one of the Python types listed in
:doc:`types`.
Returns:
List[Any]: A copy of the input parameters.
"""
if not _VALID_SP_NAME.match(procname):
raise NotSupportedError("Invalid procedure name '%s'" % procname)
params_as_dict = {str(i): e for i, e in enumerate(parameters)}
sql = (
"exec procedure "
+ procname
+ "("
+ ", ".join("%%(%d)s" % i for i in range(len(params_as_dict)))
+ ")"
)
self.execute(sql, params_as_dict)
return parameters[:]
[docs]
def execute(
self,
sql: str,
parameters: Mapping[str, ParameterValue] | Sequence[ParameterValue] | None = None,
*,
column_types: Sequence[ColumnType] | None = None,
) -> Cursor:
"""Execute a database operation (query or command).
The ``sql`` string may contain either named placeholders represented
as ``%(name)s`` or positionally ordered placeholders represented
as ``?``.
When the parameters are a mapping or ``None``, named placeholders are
being used, and so any literal ``%`` signs in the statement must be
escaped by doubling them, to distinguish them from the start of a named
placeholder. When a sequence of parameters is provided instead, any
placeholders must be positional ``?`` placeholders, and literal ``%``
signs in the SQL must not be escaped.
Note:
Using placeholders should always be the preferred method of
parameterizing the SQL query, as it prevents SQL injection
vulnerabilities, and is faster than dynamically building SQL
strings.
If ``column_types`` is provided and non-empty, it must be a sequence of
members of the `ColumnType` enumeration. The database will coerce the
data in the Nth column of the result set to the Nth given column type.
An error will be raised if the number of elements in ``column_types``
doesn't match the number of columns in the result set, or if one of the
elements is not a supported column type, or if coercion fails. If
``column_types`` is empty or not provided, no coercion is performed.
Note:
Databases APIs are not required to allow result set column types to
be specified explicitly. We allow this as a non-standard DB-API 2.0
extension.
Args:
sql (str): The SQL string to execute, as a Python format string.
parameters (Mapping[str, Any] | Sequence[Any]):
If the SQL statement has ``%(param_name)s`` style placeholders,
you must pass a mapping from parameter name to value.
If the SQL statement has ``?`` style placeholders, you must
instead pass an ordered sequence of parameter values.
column_types (Sequence[int]): An optional sequence of types (values
of the `ColumnType` enumeration) which the columns of the
result set will be coerced to.
Returns:
Cursor: As a nonstandard DB-API 2.0 extension, this method returns
the `Cursor` that it was called on, which can be used as an
iterator over the result set returned by the query. Iterating over
it will yield one ``list`` per row in the result set, where the
elements in the list correspond to the result columns within the
row, in positional order.
The `Connection.row_factory` property can be used to return rows as
a different type.
Example:
>>> cursor.execute("select 1, 2 UNION ALL select %(x)s, %(y)s",
... {'x': 2, 'y': 4})
>>> cursor.fetchall()
[[1, 2], [2, 4]]
>>> cursor.execute("select 1, 2 UNION ALL select ?, ?", [2, 4]])
>>> cursor.fetchall()
[[1, 2], [2, 4]]
"""
self._check_closed()
self._description = None
operation = _sql_operation(sql)
if not self._conn._autocommit:
# Certain operations are forbidden when not in autocommit mode.
errmsg = self._ErrorMessagesByOperation.get(operation)
if errmsg:
raise InterfaceError(errmsg)
self._execute(operation, sql, parameters, column_types=column_types)
if self._rowcount == -1:
self._load_description()
# Optional DB API Extension: execute's return value is unspecified. We
# return an iterable over the rows, but this isn't portable across DBs.
return self
[docs]
def executemany(
self, sql: str, seq_of_parameters: Sequence[Mapping[str, ParameterValue]] | Sequence[Sequence[ParameterValue]]
) -> None:
"""Execute the same SQL statement repeatedly with different parameters.
This is currently equivalent to calling execute multiple times, once
for each element provided in ``seq_of_parameters``.
Args:
sql (str): The SQL string to execute, as a Python format string of
the format expected by `execute`.
seq_of_parameters (Sequence[Mapping[str, Any]] | Sequence[Sequence[Any]]):
The ``sql`` statement will be executed once per element in this
sequence, using each successive element as the parameter values
for the corresponding call to `.execute`.
"""
self._check_closed()
for parameters in seq_of_parameters:
self.execute(sql, parameters)
def _execute(self, operation, sql, parameters=None, *, column_types=None):
self._rowcount = -1
if not self._conn._autocommit:
# Any non-SET operation starts a txn when not in autocommit mode.
if not self._conn._in_transaction and operation != "set":
try:
self._hndl.execute("begin")
except cdb2.Error as e:
_raise_wrapped_exception(e)
self._conn._in_transaction = True
if parameters is None:
parameters = {}
try:
# If variable interpolation fails, then translate the exception to
# an InterfaceError to signal that it's a client-side problem.
# If binding by index then no need to modify sql
by_name = hasattr(parameters, "items")
if by_name:
sql = sql % {name: "@" + name for name in parameters}
except KeyError as keyerr:
msg = "No value provided for parameter %s" % keyerr
raise InterfaceError(msg) from keyerr
except Exception as exc:
msg = "Invalid Python format string for query"
raise InterfaceError(msg) from exc
if _operation_ends_transaction(operation):
self._conn._in_transaction = False # txn ends, even on failure
try:
self._hndl.execute(sql, parameters, column_types=column_types)
except cdb2.Error as e:
_raise_wrapped_exception(e)
if operation == "begin":
self._conn._in_transaction = True # txn successfully started
elif not self._conn._in_transaction and _modifies_rows(operation):
# We're not in a transaction, and the last statement could have
# modified rows. Either we've just explicitly committed
# a transaction, or we're in autocommit mode and ran DML outside of
# an explicit transaction. We can get the count of affected rows.
self._update_rowcount()
[docs]
def setoutputsize(self, size: Any, column: int = None) -> None:
"""No-op; implemented for PEP-249 compliance."""
self._check_closed()
def _update_rowcount(self):
try:
self._rowcount = self._hndl.get_effects()[0]
except cdb2.Error:
self._rowcount = -1
def _load_description(self):
names = self._hndl.column_names()
types = self._hndl.column_types()
self._description = tuple(
(name, type, None, None, None, None, None)
for name, type in zip(names, types)
)
if not self._description:
self._description = None
[docs]
def fetchone(self) -> Row | None:
"""Fetch the next row of the current result set.
Returns:
If no rows remain in the current result set, ``None`` is
returned, otherwise the next row of the result set is returned. By
default the row is returned as a `list`, where the elements in the
list correspond to the result row's columns in positional order,
but this can be changed with the `Connection.row_factory` property.
"""
try:
return next(self)
except StopIteration:
return None
[docs]
def fetchmany(self, n: int | None = None) -> List[Row]:
"""Fetch the next set of rows of the current result set.
Args:
n: Maximum number of rows to be returned. If this argument is not
given, `Cursor.arraysize` is used as the maximum.
Returns:
list: Returns a `list` containing the next ``n`` rows of the
result set. If fewer than ``n`` rows remain, the returned list
will contain fewer than ``n`` elements. If no rows remain, the
list will be empty. By default each row is
returned as a `list`, where the elements in the list correspond to
the result row's columns in positional order, but this can be
changed with the `Connection.row_factory` property.
"""
if n is None:
n = self._arraysize
return [x for x in itertools.islice(self, 0, n)]
[docs]
def fetchall(self) -> List[Row]:
"""Fetch all remaining rows of the current result set.
Returns:
list: Returns a `list` containing all remaining rows of the
result set. By default each row is returned as a `list`, where the
elements in the list correspond to the result row's columns in
positional order, but this can be changed with the
`Connection.row_factory` property.
"""
return [x for x in self]
# Optional DB API Extension
[docs]
def __iter__(self) -> Iterator[Row]:
"""Iterate over all rows in a result set.
By default each row is returned as a `list`, where the elements in the
list correspond to the result row's columns in positional order, but
this can be changed with the `Connection.row_factory` property.
Note:
This is not required by DB-API 2.0; for maximum portability
applications should prefer to use `fetchone` or `fetchmany` or
`fetchall` instead.
Example:
>>> cursor.execute("select 1, 2 UNION ALL select 3, 4")
>>> for row in cursor:
... print(row)
[1, 2]
[3, 4]
"""
self._check_closed()
return self
# Optional DB API Extension
def next(self):
self._check_closed()
if not self._description:
raise InterfaceError("No result set exists")
try:
return next(self._hndl)
except cdb2.Error as e:
_raise_wrapped_exception(e)
__next__ = next