A => .gitignore +25 -0
@@ 0,0 1,25 @@
+# binary files
+*.pyc
+bin
+
+# buildout
+.installed.cfg
+.eggs
+.deveggs
+.parts
+.downloads
+
+# temporary build files
+tmp
+build
+dist
+*.egg-info
+
+# IDE files
+.project
+.pydevproject
+.settings
+
+# test files
+var
+
A => .hgignore +27 -0
@@ 0,0 1,27 @@
+syntax: glob
+
+# binary files
+*.pyc
+bin
+
+# buildout
+.installed.cfg
+.eggs
+.deveggs
+.parts
+.downloads
+
+# temporary build files
+tmp
+build
+dist
+*.egg-info
+
+# IDE files
+.project
+.pydevproject
+.settings
+
+# test files
+var
+
A => LICENSE +21 -0
@@ 0,0 1,21 @@
+Copyright (c) 2014 Oben Sonne <obensonne@googlemail.com>
+
+Permission is hereby granted, free of charge, to any person obtaining
+a copy of this software and associated documentation files (the
+"Software"), to deal in the Software without restriction, including
+without limitation the rights to use, copy, modify, merge, publish,
+distribute, sublicense, and/or sell copies of the Software, and to
+permit persons to whom the Software is furnished to do so, subject to
+the following conditions:
+
+The above copyright notice and this permission notice shall be
+included in all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
+LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
+WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+
A => MANIFEST.in +1 -0
@@ 0,0 1,1 @@
+include README.rst LICENSE
A => README.rst +213 -0
@@ 0,0 1,213 @@
+=====
+Deeno
+=====
+
+Deeno is not an ORM. Deeno is a very thin database abstraction layer.
+
+**Disclaimer:** Deeno development is in a very early stage. Do not yet use it
+in a production environment!
+
+Deeno provides a slick Python API for typical CRUD operations while motivating
+you to use plain SQL for more smart things. The rationale here is to not
+enforce object oriented paradigms on relational data. If you ever experienced a
+lot of WTF when reading chained ORM monsters but also are tired of doing plain
+CRUD stuff in raw SQL, then Deeno is for you.
+
+Deeno assumes you define and manage database schemas outside the application
+which interacts with the database. This is typically the case for databases
+used by different applications running in different environments and on
+different platforms. In that case it is redundant to re-define a good deal of
+the schema (e.g. primary keys, columns and their default values) in models --
+DRY. Deeno collects (and caches) necessary schema information like primary key
+information and relation types using database introspection.
+
+Currently SQLite and PostgreSQL are supported.
+
+**Reasons for Deeno:**
+
+- You manage the database schema in SQL (i.e. not derived from Python model
+ objects) and you do not want to specify your schema twice.
+- You prefer raw SQL over complex chained model methods where it is not obvious
+ how things map to SQL.
+- You prefer to avoid raw SQL for simple CRUD operations.
+- You do not want your DB layer to be a black box (i.e. it should be easy to
+ guess how Python statements map to SQL statements).
+
+-----------
+Quick Intro
+-----------
+
+Let's set up a test database::
+
+ >>> import os
+ >>> from deeno.db import SQLiteDatabase
+ >>> db = SQLiteDatabase(config={'path': ':memory:'})
+ >>> db.execute("""
+ ... CREATE TABLE customer (
+ ... customer_id INTEGER PRIMARY KEY,
+ ... name TEXT
+ ... )
+ ... """)
+ >>> db.execute("""
+ ... CREATE TABLE user (
+ ... user_id INTEGER PRIMARY KEY,
+ ... name TEXT,
+ ... age INTEGER,
+ ... active BOOLEAN DEFAULT 1,
+ ... customer_id INTEGER REFERENCES customer (customer_id)
+ ... )
+ ... """)
+ >>> db.execute("""
+ ... INSERT INTO user (name, age, active)
+ ... VALUES ('Bob', 28, 1), ('Joe', 32, 0)
+ ... """)
+ 2
+
+You can work with relations without defining a single model::
+
+ >>> user = db.r.user.get(user_id=1)
+ >>> user.name == 'Bob'
+ True
+ >>> user.name = 'Bøb'
+ >>> user.save()
+
+Create a new user::
+
+ >>> user = db.r.user.new(name='Brian', age=48)
+ >>> print('%s has ID %s' % (user.name, user.user_id))
+ Brian has ID 3
+
+The `get` and `new` functions on relations return active-record-like objects.
+That's the most ORM-like behavior of Deeno. For all other stuff Deeno provides
+merely convenience functions for performing SQL queries::
+
+ >>> users = db.r.user.select('name', where={'active': True}, limit=10)
+ >>> for user in users:
+ ... print(user.name)
+ Bøb
+ Brian
+
+
+For where clauses with not only equality checks, use parameterized SQL
+fragments::
+
+ >>> users = db.r.user.select('name', where=('active AND age > ?', [40]))
+ >>> for user in users:
+ ... print(user.name)
+ Brian
+
+This is equivalent::
+
+ >>> users = db.fetchall("SELECT * FROM user WHERE active AND age > ? LIMIT ?", [40, 10])
+
+The `select` and `fetchall` functions do not return active records but named
+tuples. IMHO active records do not fit well when working with multiple rows
+from a relation.
+
+Joins are written like that::
+
+ >>> users = db.r.user.left_join('customer', using=('customer_id',)).select()
+
+Some more impressions::
+
+ >>> n = db.r.user.update({'active': True}, where=('age < ?', [40]))
+ >>> print('affected rows: %s' % n)
+ affected rows: 2
+ >>> rows = db.r.user.delete(returning=True) # postgres only # doctest: +SKIP
+
+|flattr|
+
+Deeno Highlights:
+
+- no model definitions (deeno uses introspection when needed)
+- easy to execute plain SQL
+- CRUD in Python, fancy stuff in SQL
+
+.. contents::
+ :depth: 1
+ :local:
+
+------------
+Installation
+------------
+
+Deeno requires Python 3.
+
+Run::
+
+ pip install deeno
+
+
+-----
+Usage
+-----
+
+*TODO*
+
+---------
+Resources
+---------
+
+:Releases and documentation: `PyPI`_
+
+:Issues and source code: `BitBucket`_
+
+.. :Source code mirror: ` `GitHub`_
+
+.. _`PyPI`: http://pypi.python.org/pypi/deeno
+.. _`BitBucket`: https://bitbucket.org/obensonne/deeno
+.. _`GitHub`: https://github.com/obensonne/deeno
+
+-------------
+Contributions
+-------------
+
+To contribute to Deeno, fork the project at `BitBucket`_.
+
+.. or `GitHub`_.
+
+Every fix or new feature should include one or more corresponding test cases.
+Please also `post an issue`_ describing your fix or enhancement.
+
+.. _`post an issue`: https://bitbucket.org/obensonne/deeno/issues
+
+Deeno uses `Buildout`_ to easily set up the development environment.
+Buildout automates the process of downloading and installing requirements to
+use and develop Deeno. Requirements are installed local to the project
+source directory, i.e. it does not clutter the system Python installation.
+
+In a fresh source checkout, run::
+
+ $ python3 bootstrap.py
+ $ bin/buildout
+
+When done, the following scripts can be found in the ``bin/`` directory:
+
+``tests``
+ Test runner script (a wrapper for `nose`_).
+
+``fab``
+ `Fabric`_ binary to use for the project's *fabfile*.
+
+``python``
+ A Python interpreter with access to the local development version of
+ the *deeno* module.
+
+.. _`Buildout`: http://www.buildout.org/
+.. _`nose`: http://readthedocs.org/docs/nose/
+.. _`Fabric`: http://fabfile.org/
+
+-------
+Changes
+-------
+
+Version 0.1
+~~~~~~~~~~~
+
+- Initial release.
+
+.. ......................................................................... ..
+
+.. |flattr| image:: https://api.flattr.com/button/flattr-badge-large.png
+ :alt: Flattr this
+ :target: https://flattr.com/submit/auto?user_id=obs&url=https%3A%2F%2Fbitbucket.org%2Fobensonne%2Fdeeno
A => bootstrap.py +170 -0
@@ 0,0 1,170 @@
+##############################################################################
+#
+# Copyright (c) 2006 Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Bootstrap a buildout-based project
+
+Simply run this script in a directory containing a buildout.cfg.
+The script accepts buildout command-line options, so you can
+use the -c option to specify an alternate configuration file.
+"""
+
+import os
+import shutil
+import sys
+import tempfile
+
+from optparse import OptionParser
+
+tmpeggs = tempfile.mkdtemp()
+
+usage = '''\
+[DESIRED PYTHON FOR BUILDOUT] bootstrap.py [options]
+
+Bootstraps a buildout-based project.
+
+Simply run this script in a directory containing a buildout.cfg, using the
+Python that you want bin/buildout to use.
+
+Note that by using --find-links to point to local resources, you can keep
+this script from going over the network.
+'''
+
+parser = OptionParser(usage=usage)
+parser.add_option("-v", "--version", help="use a specific zc.buildout version")
+
+parser.add_option("-t", "--accept-buildout-test-releases",
+ dest='accept_buildout_test_releases',
+ action="store_true", default=False,
+ help=("Normally, if you do not specify a --version, the "
+ "bootstrap script and buildout gets the newest "
+ "*final* versions of zc.buildout and its recipes and "
+ "extensions for you. If you use this flag, "
+ "bootstrap and buildout will get the newest releases "
+ "even if they are alphas or betas."))
+parser.add_option("-c", "--config-file",
+ help=("Specify the path to the buildout configuration "
+ "file to be used."))
+parser.add_option("-f", "--find-links",
+ help=("Specify a URL to search for buildout releases"))
+
+
+options, args = parser.parse_args()
+
+######################################################################
+# load/install setuptools
+
+to_reload = False
+try:
+ import pkg_resources
+ import setuptools
+except ImportError:
+ ez = {}
+
+ try:
+ from urllib.request import urlopen
+ except ImportError:
+ from urllib2 import urlopen
+
+ # XXX use a more permanent ez_setup.py URL when available.
+ exec(urlopen('https://bitbucket.org/pypa/setuptools/raw/0.7.2/ez_setup.py'
+ ).read(), ez)
+ setup_args = dict(to_dir=tmpeggs, download_delay=0)
+ ez['use_setuptools'](**setup_args)
+
+ if to_reload:
+ reload(pkg_resources)
+ import pkg_resources
+ # This does not (always?) update the default working set. We will
+ # do it.
+ for path in sys.path:
+ if path not in pkg_resources.working_set.entries:
+ pkg_resources.working_set.add_entry(path)
+
+######################################################################
+# Install buildout
+
+ws = pkg_resources.working_set
+
+cmd = [sys.executable, '-c',
+ 'from setuptools.command.easy_install import main; main()',
+ '-mZqNxd', tmpeggs]
+
+find_links = os.environ.get(
+ 'bootstrap-testing-find-links',
+ options.find_links or
+ ('http://downloads.buildout.org/'
+ if options.accept_buildout_test_releases else None)
+ )
+if find_links:
+ cmd.extend(['-f', find_links])
+
+setuptools_path = ws.find(
+ pkg_resources.Requirement.parse('setuptools')).location
+
+requirement = 'zc.buildout'
+version = options.version
+if version is None and not options.accept_buildout_test_releases:
+ # Figure out the most recent final version of zc.buildout.
+ import setuptools.package_index
+ _final_parts = '*final-', '*final'
+
+ def _final_version(parsed_version):
+ for part in parsed_version:
+ if (part[:1] == '*') and (part not in _final_parts):
+ return False
+ return True
+ index = setuptools.package_index.PackageIndex(
+ search_path=[setuptools_path])
+ if find_links:
+ index.add_find_links((find_links,))
+ req = pkg_resources.Requirement.parse(requirement)
+ if index.obtain(req) is not None:
+ best = []
+ bestv = None
+ for dist in index[req.project_name]:
+ distv = dist.parsed_version
+ if _final_version(distv):
+ if bestv is None or distv > bestv:
+ best = [dist]
+ bestv = distv
+ elif distv == bestv:
+ best.append(dist)
+ if best:
+ best.sort()
+ version = best[-1].version
+if version:
+ requirement = '=='.join((requirement, version))
+cmd.append(requirement)
+
+import subprocess
+if subprocess.call(cmd, env=dict(os.environ, PYTHONPATH=setuptools_path)) != 0:
+ raise Exception(
+ "Failed to execute command:\n%s",
+ repr(cmd)[1:-1])
+
+######################################################################
+# Import and run buildout
+
+ws.add_entry(tmpeggs)
+ws.require(requirement)
+import zc.buildout.buildout
+
+if not [a for a in args if '=' not in a]:
+ args.append('bootstrap')
+
+# if -c was provided, we push it back into args for buildout' main function
+if options.config_file is not None:
+ args[0:0] = ['-c', options.config_file]
+
+zc.buildout.buildout.main(args)
+shutil.rmtree(tmpeggs)
A => buildout.cfg +54 -0
@@ 0,0 1,54 @@
+[buildout]
+parts = deeno tests tools fabric
+develop = .
+unzip = true
+
+newest = false
+
+eggs-directory = .eggs
+develop-eggs-directory = .deveggs
+parts-directory = .parts
+
+[deeno]
+recipe = zc.recipe.egg
+interpreter = python
+eggs = psycopg2
+
+[tests]
+recipe = pbp.recipe.noserunner
+script = tests
+defaults =
+ --with-doctest
+ --doctest-extension .rst
+ . ../README.rst
+eggs =
+ psycopg2
+ deeno
+ mock
+working-directory = ${buildout:directory}/src
+
+[fabric]
+recipe = zc.recipe.egg
+eggs =
+ deeno
+ docutils
+ fabric
+
+
+[tools]
+recipe = zc.recipe.egg:scripts
+eggs =
+ pep8
+extra-paths = ${buildout:directory}/src
+entry-points =
+ style=tools.style:main
+
+# -----------------------------------------------------------------------------
+# Documentation
+# -----------------------------------------------------------------------------
+
+# Buildout: http://www.buildout.org/
+# zc.buildout: http://pypi.python.org/pypi/zc.buildout
+# zc.recipe.egg: http://pypi.python.org/pypi/zc.recipe.egg
+# pbp.recipe.noserunner: http://pypi.python.org/pypi/pbp.recipe.noserunner
+
A => fabfile.py +103 -0
@@ 0,0 1,103 @@
+"""Fabfile for release management."""
+
+import codecs
+import glob
+import os
+import re
+
+from fabric.api import local, abort, lcd
+
+HERE = os.path.dirname(__file__)
+ROOT = HERE
+
+# =============================================================================
+# internal helpers
+# =============================================================================
+
+def _readfile(fname, strip="\n"):
+ """Shortcut for reading a text file."""
+
+ with codecs.open(fname, 'r', 'UTF8') as fp:
+ content = fp.read()
+ return content.strip(strip) if strip else content
+
+def _contains(fname, rx, reflags=0):
+ """Check if content of `fname` matches (contains) `rx`."""
+
+ content = _readfile(fname)
+ return re.search(rx, content, reflags)
+
+def _needcleanworkingcopy():
+ """Aborts if working copy is dirty."""
+
+ if local("hg status -n", capture=True):
+ abort("dirty working copy")
+
+# =============================================================================
+# release tools
+# =============================================================================
+
+def push():
+ """Push master branch."""
+
+ _needcleanworkingcopy()
+
+ hgignore = _readfile(".hgignore").split("\n")[2:]
+ gitignore = _readfile(".gitignore").split("\n")
+ if hgignore != gitignore:
+ abort("hg and git ignore files differ")
+
+ local("hg push -r master bitbucket")
+ #local("hg push -r master github")
+
+def release_check(version):
+ """Various checks to be done prior a release."""
+
+ # --- check README --------------------------------------------------------
+
+ from docutils.core import publish_cmdline
+ readme = os.path.join(ROOT, "README.rst")
+ publish_cmdline(argv=["--halt", "2", readme, os.devnull])
+
+ # --- version numbers -----------------------------------------------------
+
+ rx = r'\n--+\nChanges\n--+\n\nVersion %s\n~~+\n\n' % version
+ if not _contains(readme, rx):
+ abort("bad version in README.rst")
+
+ setup = os.path.join(ROOT, "setup.py")
+ rx = r'''\nversion *= *['"]%s['"]\n''' % version
+ if not _contains(setup, rx):
+ abort("bad version in setup.py")
+
+ # --- run tests -----------------------------------------------------------
+
+ local(os.path.join(ROOT, "bin", "tests"))
+
+ # --- check working copy --------------------------------------------
+
+ _needcleanworkingcopy()
+
+ out = local("hg bookmarks", capture=True)
+ if not re.match(r'\s*\* master\s', out):
+ abort("working copy is not at master bookmark")
+
+def release(version):
+ """Make a release."""
+
+ local("bin/buildout")
+
+ release_check(version)
+
+ local("rm -rf %s" % os.path.join(ROOT, "build"))
+ local("rm -rf %s" % os.path.join(ROOT, "dist"))
+
+ local("bin/buildout setup %s clean build sdist" % ROOT) # test build
+
+ local("hg tag %s" % version)
+
+ wiki()
+
+ local("bin/buildout setup %s register sdist upload" % ROOT)
+
+ push()
A => setup.py +39 -0
@@ 0,0 1,39 @@
+import sys, os
+
+from setuptools import setup, find_packages
+
+if sys.version_info.major < 3:
+ raise RuntimeError('deeno needs Python 3')
+
+here = os.path.abspath(os.path.dirname(__file__))
+README = open(os.path.join(here, 'README.rst')).read()
+
+version = '0.1'
+
+install_requires = []
+
+setup(name='deeno',
+ version=version,
+ description="Deeno is not an ORM.",
+ long_description=README,
+ classifiers=[
+ 'Development Status :: 3 - Alpha',
+ 'Intended Audience :: Developers',
+ 'License :: OSI Approved :: MIT License',
+ 'Natural Language :: English',
+ 'Programming Language :: Python :: 3',
+ 'Topic :: Database :: Front-Ends',
+ 'Topic :: Software Development :: Libraries',
+ ],
+ keywords='database orm sql postgresql sqlite',
+ author='Oben Sonne',
+ author_email='obensonne@googlemail.com',
+ url='http://pypi.python.org/pypi/deeno',
+ license='MIT',
+ packages=find_packages('src', exclude=['tests', 'tools']),
+ package_dir={'': 'src'},
+ include_package_data=True,
+ zip_safe=False,
+ install_requires=install_requires,
+ entry_points={},
+)
A => src/deeno/__init__.py +5 -0
@@ 0,0 1,5 @@
+"""Deeno is not an ORM."""
+
+
+class DeenoException(Exception):
+ pass
A => src/deeno/db.py +464 -0
@@ 0,0 1,464 @@
+from functools import partial, wraps
+from contextlib import contextmanager
+import logging
+import re
+import sqlite3
+import threading
+from collections import namedtuple
+
+from deeno import sql, DeenoException
+
+try:
+ import psycopg2
+ from psycopg2.extras import NamedTupleCursor
+except ImportError as e:
+ psycopg2 = e
+
+log = logging.getLogger(__name__)
+
+
+class NoMatchingRecord(DeenoException):
+ pass
+
+
+class Record(dict):
+
+ def __init__(self, table, *args, **kwargs):
+ super(Record, self).__init__(*args, **kwargs)
+ self.__dict__['_table'] = table
+ assert set(table.pk).issubset(self)
+
+ def __getattr__(self, name):
+ try:
+ return self.__getitem__(name)
+ except KeyError:
+ raise AttributeError(name)
+
+ def __setattr__(self, name, value):
+ if name in self.__dict__:
+ self.__dict__[name] = value
+ else:
+ self.__setitem__(name, value)
+
+ def __delattr__(self, name):
+ if name in self.__dict__:
+ self.__dict__.__delitem__(name)
+ else:
+ self.__delitem__(name)
+
+ def __setitem__(self, name, value):
+ if name in self._table.pk:
+ raise AttributeError('primary key values are immutable')
+ super(Record, self).__setitem__(name, value)
+
+ def __delitem__(self):
+ raise AttributeError('record attributes cannot be removed')
+
+ def save(self):
+ if not self._table.pk:
+ raise AttributeError('only records from tables with a primary key '
+ 'can be saved')
+ if set(self._table.pk) == set(self.keys()):
+ log.debug('nothing to save')
+ return
+ where, values = {}, {}
+ for k, v in self.items():
+ if k in self._table.pk:
+ where[k] = v
+ else:
+ values[k] = v
+ self._table.update(values, where=where)
+
+ def delete(self):
+ if not self._table.pk:
+ raise AttributeError('only records from tables with a primary key '
+ 'can be deleted')
+ where = dict((k, self[k]) for k in self._table.pk)
+ self._table.delete(where=where)
+
+
+def table_only_operation(method):
+
+ @wraps(method)
+ def wrapper(relation, *args, **kwargs):
+ if relation.type != 'table':
+ raise AttributeError('invoked table-only operation %r on '
+ 'non-table relation %r' % (method.__name__, relation.name))
+ return method(relation, *args, **kwargs)
+
+ return wrapper
+
+
+def primary_key_only_operation(method):
+
+ @wraps(method)
+ def wrapper(relation, *args, **kwargs):
+ if not relation.pk:
+ raise AttributeError('invoked primary-key-requiring operation %r '
+ 'on relation without a primary key')
+ return method(relation, *args, **kwargs)
+
+ return wrapper
+
+
+class Relation(object):
+
+ _rx_join = re.compile(r'(?:(?:cross_)|'
+ '(?:(?:natural_)?(?:inner_)?(?:left_|right_|full_)?(?:outer_)?)'
+ ')join$', re.I)
+
+ def __init__(self, db, name, rtype, pk):
+ self.db = db
+ self.name = name if rtype == 'join' else db.quote(name)
+ self.type = rtype
+ self.pk = pk or ()
+
+ def __str__(self):
+ return self.name
+
+ def get(self, **kwargs):
+ try:
+ row = next(self.select(where=kwargs))
+ except StopIteration:
+ raise NoMatchingRecord(kwargs)
+ return Record(self, **row._asdict())
+
+ def select(self, *args, **kwargs):
+ q, p = sql.select(self, *args, **kwargs)
+ rows = self.db.fetchall(q, p)
+ return rows
+
+ @table_only_operation
+ @primary_key_only_operation
+ def new(self, **kwargs):
+ row = self.db.insert_and_return_row(self, kwargs)
+ return Record(self, **row._asdict())
+
+ @table_only_operation
+ def delete(self, *args, **kwargs):
+ q, p = sql.delete(self, *args, **kwargs)
+ if kwargs.get('returning'):
+ result = self.db.fetchall(q, p)
+ else:
+ result = self.db.execute(q, p)
+ return result
+
+ @table_only_operation
+ def update(self, *args, **kwargs):
+ q, p = sql.update(self, *args, **kwargs)
+ if kwargs.get('returning'):
+ result = self.db.fetchall(q, p)
+ else:
+ result = self.db.execute(q, p)
+ return result
+
+ @table_only_operation
+ def insert(self, *args, **kwargs):
+ q, p = sql.insert(self, *args, **kwargs)
+ if kwargs.get('returning'):
+ result = self.db.fetchall(q, p)
+ else:
+ result = self.db.execute(q, p)
+ self.db._tl.last_insert_table = self
+ return result
+
+ def __getattr__(self, name):
+ if self._rx_join.match(name):
+ mode = name.replace('_', ' ').upper()
+ return partial(self._join, mode)
+ raise AttributeError(name)
+
+ def _join(self, mode, name, on=None, using=None):
+ name = self.db.quote(name)
+ name = sql.join(self, mode, name, on, using)
+ return Relation(self.db, name, 'join', None)
+
+
+class Relatiomat(object):
+
+ def __init__(self, db):
+ self.__db = db
+ self._cache = {}
+
+ def __get_relation(self, name):
+ name = '.'.join(name.split('__', 1))
+ try:
+ return self._cache[name]
+ except KeyError:
+ rtype, pk = self.__db.get_relation_info(name)
+ r = Relation(self.__db, name, rtype, pk or ())
+ self._cache[name] = r
+ return r
+
+ __call__ = __get_relation
+ __getattr__ = __get_relation
+ __getitem__ = __get_relation
+
+
+def logquery(meth):
+
+ rx_strip_query = re.compile(r' *\n *')
+
+ @wraps(meth)
+ def wrapper(db, query, *args):
+ stripped = rx_strip_query.sub(' ', query.strip())
+ params, = args or ((),)
+ log.debug('%s: %s %r' % (meth.__name__, stripped, params))
+ return meth(db, query, *args)
+
+ return wrapper
+
+
+class Database(object):
+
+ thread_local_config = False
+
+ placeholder = None
+ quote_char = '"'
+ schema_separator_char = None
+ script_func = 'execute'
+
+ def __init__(self, config=None):
+ self.config = config
+ self._tl = threading.local()
+
+ def connect(self):
+ raise NotImplementedError
+
+ def disconnect(self):
+ c = getattr(self._tl, 'connection', None)
+ if c is not None:
+ c.close()
+ self._tl.connection = None
+
+ def invalidate_relation_cache(self):
+ """Invalidate the relation meta information cache.
+
+ To be called when the database schema context or primary keys of tables
+ have changed so that the cached meta information for relations may not
+ be valid anymore.
+
+ Note that this only affects the current thread's relation info cache.
+
+ """
+ self.relatiomat._cache.clear()
+
+ @property
+ def relatiomat(self):
+ if getattr(self._tl, 'relatiomat', None) is None:
+ self._tl.relatiomat = Relatiomat(self)
+ return self._tl.relatiomat
+
+ r = relatiomat
+ t = relatiomat
+
+ @property
+ def connection(self):
+ if getattr(self._tl, 'connection', None) is None:
+ self.connect()
+ return self._tl.connection
+
+ def get_relation_info(self, name):
+ raise NotImplementedError
+
+ def begin(self):
+ self.execute('BEGIN')
+
+ def rollback(self):
+ self.connection.rollback()
+
+ def commit(self):
+ self.connection.commit()
+
+ @logquery
+ def execute(self, query, params=()):
+ c = self.connection.cursor()
+ c.execute(query, params)
+ return None if c.rowcount == -1 else c.rowcount
+
+ @logquery
+ def script(self, query):
+ """Execute a parameterless query with multiple SQL statements
+
+ This is only useful if a subclass' `execute` does not allow multiple
+ statements in queries (e.g. `SQLiteDatabase`).
+
+ """
+ c = self.connection.cursor()
+ f = getattr(c, self.script_func)
+ f(query)
+
+ @logquery
+ def fetchall(self, query, params=()):
+ c = self.connection.cursor()
+ c.execute(query, params)
+ return iter(c)
+
+ @logquery
+ def fetchone(self, query, params=()):
+ c = self.connection.cursor()
+ c.execute(query, params)
+ return c.fetchone()
+
+ @contextmanager
+ def transaction(self):
+ self.begin()
+ try:
+ yield
+ except:
+ self.rollback()
+ raise
+ self.commit()
+
+ def quote(self, s):
+
+ qc = self.quote_char
+ ssc = self.schema_separator_char or ''
+ parts = s.split(ssc) if ssc else [s]
+ parts = ['%s%s%s' % (qc, part, qc) for part in parts]
+ return ssc.join(parts)
+
+ def insert_and_return_row(self, relation, row):
+ raise NotImplementedError
+
+
+class SQLiteDatabase(Database):
+
+ placeholder = '?'
+ script_func = 'executescript'
+
+ def __init__(self, *args, **kwargs):
+ super(SQLiteDatabase, self).__init__(*args, **kwargs)
+ self._row_types = {}
+
+ def _row_factory(self, cursor, row):
+ try:
+ row_type = self._row_types[cursor.description]
+ except KeyError:
+ colnames = tuple(x[0] for x in cursor.description)
+ row_type = namedtuple('DeenoSqliteRow', colnames)
+ self._row_types[cursor.description] = row_type
+ return row_type._make(row)
+
+ def connect(self):
+ self._tl.connection = sqlite3.connect(self.config['path'])
+ self._tl.connection.row_factory = self._row_factory
+ self._tl.connection.isolation_level = None
+
+ def get_relation_info(self, name):
+
+ q = 'SELECT type FROM sqlite_master WHERE name = ?'
+ p = [name]
+ r = self.fetchone(q, p)
+ if r is None:
+ raise ValueError('no such relation: %r' % name)
+ if r.type not in ('table', 'view'):
+ raise ValueError('unknown relation type: %r (%r)' % (r.type, name))
+
+ pk = None
+ if r.type == 'table':
+ q = 'pragma table_info(%s)' % self.quote(name)
+ rows = self.fetchall(q)
+ pk = tuple(row.name for row in rows if row.pk)
+
+ return r.type, pk
+
+ def insert_and_return_row(self, relation, row):
+ q, p = sql.insert(relation, row=row)
+ c = self.connection.cursor()
+ c.execute(q, p)
+ rowid = c.lastrowid
+ rows = relation.select(where={'rowid': rowid})
+ return next(rows)
+
+
+class PostgresDatabase(Database):
+
+ placeholder = '%s'
+ schema_separator_char = '.'
+
+ def __init__(self, *args, **kwargs):
+ if isinstance(psycopg2, ImportError):
+ raise psycopg2
+ super(PostgresDatabase, self).__init__(*args, **kwargs)
+
+ def connect(self):
+ self._tl.connection = psycopg2.connect(host=self.config['host'],
+ database=self.config['database'], user=self.config['user'])
+ self._tl.connection.cursor_factory = NamedTupleCursor
+ self._tl.connection.autocommit = True
+
+ def begin(self):
+ self.connection.autocommit = False
+
+ def rollback(self):
+ self.connection.rollback()
+ self.connection.autocommit = True
+
+ def commit(self):
+ self.connection.commit()
+ self.connection.autocommit = True
+
+ def get_relation_info(self, name):
+
+ if '.' in name:
+ schema, name = name.split('.', 1)
+ schemas = [schema]
+ else:
+ q = "SHOW search_path"
+ row = self.fetchone(q)
+ schemas = row[0].replace(',', ' ').split()
+
+ log.debug('look for %r in schemas %r' % (name, schemas))
+
+ # Since the same base relation name may occur in multiple schemas, we
+ # have to walk the search path to find the correct schema for a
+ # relation.
+
+ rtype, pk = None, None
+ for schema in schemas:
+ p = [schema, name]
+ q = """
+ SELECT 1 AS x FROM pg_catalog.pg_views
+ WHERE schemaname = %s AND viewname = %s
+ """
+ row = self.fetchone(q, p)
+ if row:
+ rtype = 'view'
+ break
+ q = """
+ SELECT 1 AS x FROM pg_catalog.pg_tables
+ WHERE schemaname = %s AND tablename = %s
+ """
+ row = self.fetchone(q, p)
+ if row:
+ rtype = 'table'
+ break
+
+ if rtype is None:
+ raise ValueError('no view/table %r in current search path' % name)
+
+ if rtype == 'table':
+ q = """
+ SELECT
+ a.attname
+ FROM
+ pg_catalog.pg_attribute a,
+ pg_catalog.pg_class c,
+ pg_catalog.pg_index i
+ WHERE
+ c.oid = %s::regclass AND
+ indrelid = c.oid AND
+ a.attrelid = c.oid AND
+ a.attnum = any(i.indkey)
+ AND indisprimary
+ """
+ p = [self.quote('%s.%s' % (schema, name))]
+ rows = self.fetchall(q, p)
+ pk = tuple(r[0] for r in rows)
+
+ return rtype, pk
+
+ def insert_and_return_row(self, relation, row):
+ rows = relation.insert(row=row, returning='*')
+ return next(rows)
A => src/deeno/sql.py +188 -0
@@ 0,0 1,188 @@
+"""
+SQL query builder.
+
+The parameter `relation` generally refers to a `deeno.db.Relation` instance.
+
+"""
+import logging
+
+log = logging.getLogger(__name__)
+
+
+def _append_where(db, query, params, where):
+
+ if not where:
+ return
+
+ if isinstance(where, dict):
+ where_clause, where_params = [], []
+ for column, value in sorted(where.items()):
+ column = db.quote(column)
+ where_clause.append('%s = %s' % (column, db.placeholder))
+ where_params.append(value)
+ where_clause = ' AND '.join(where_clause)
+ else:
+ where_clause, where_params = where
+ query.append('WHERE')
+ query.append(where_clause)
+ params.extend(where_params)
+
+
+def _append_returning(db, query, returning):
+
+ if returning:
+ if not isinstance(returning, str):
+ returning = ', '.join(db.quote(c) for c in returning)
+ query.append('RETURNING')
+ query.append(returning)
+
+
+def join(relation, mode, other, on=None, using=None):
+
+ query = ['%s %s %s' % (relation, mode.upper(), other)]
+ if on:
+ query.append('ON %s' % (on,))
+ if using:
+ if not isinstance(using, str):
+ using = ', '.join(relation.db.quote(c) for c in using)
+ query.append('USING (%s)' % (using,))
+ return ' '.join(query)
+
+
+def select(relation, columns='*', where=None, groupby=None, having=None,
+ orderby=None, limit=None, offset=None):
+
+ if not isinstance(columns, str):
+ columns = ', '.join(relation.db.quote(c) for c in columns)
+
+ query = ["SELECT %s FROM %s" % (columns, relation)]
+ params = []
+
+ _append_where(relation.db, query, params, where)
+
+ if groupby:
+ if not isinstance(groupby, str):
+ groupby = ', '.join(groupby)
+ query.append('GROUP BY')
+ query.append(groupby)
+
+ if having:
+ query.append('HAVING')
+ query.append(having)
+
+ if orderby:
+ if not isinstance(orderby, str):
+ orderby = ', '.join(
+ relation.db.quote(c) if isinstance(c, str) else str(c)
+ for c in orderby
+ )
+ query.append('ORDER BY')
+ query.append(orderby)
+
+ if offset:
+ query.append('OFFSET %s' % (relation.db.placeholder,))
+ params.append(offset)
+
+ if limit:
+ query.append('LIMIT %s' % (relation.db.placeholder,))
+ params.append(limit)
+
+ return ' '.join(query), params
+
+
+def update(relation, assign, where=None, returning=None):
+
+ if not assign:
+ raise ValueError('need something to set')
+
+ query = ["UPDATE %s SET" % (relation,)]
+ params = []
+
+ if isinstance(assign, dict):
+ assign_clause, assign_params = [], []
+ for k, v in sorted(assign.items()):
+ k = relation.db.quote(k)
+ assign_clause.append('%s = %s' % (k, relation.db.placeholder))
+ assign_params.append(v)
+ assign_clause = ', '.join(assign_clause)
+ else:
+ assign_clause, assign_params = assign
+ query.append(assign_clause)
+ params.extend(assign_params)
+
+ _append_where(relation.db, query, params, where)
+ _append_returning(relation.db, query, returning)
+
+ return ' '.join(query), params
+
+
+def delete(relation, where=None, returning=None):
+
+ query = ["DELETE FROM %s" % (relation,)]
+ params = []
+
+ _append_where(relation.db, query, params, where)
+ _append_returning(relation.db, query, returning)
+
+ return ' '.join(query), params
+
+
+def insert(relation, columns=None, rows=None, row=None, returning=False):
+
+ if rows and row:
+ raise ValueError('only one of `rows` or `row` may be specified')
+
+ if rows is None and row is None:
+ if columns:
+ raise ValueError('need some rows')
+ rows = [[]]
+ columns = ()
+
+ if rows is None:
+ rows = [row]
+
+ if columns is None:
+ if not isinstance(rows[0], dict):
+ raise ValueError('either provide columns ore dictionary rows')
+ columns = tuple(sorted(rows[0].keys()))
+ if set(tuple(sorted(r.keys())) for r in rows) != set([columns]):
+ raise ValueError('dictionary rows must have identical keys')
+ rows = [[r[k] for k in columns] for r in rows]
+ else:
+ if isinstance(columns, str):
+ columns = tuple(columns.replace(',', ' ').split())
+ if isinstance(rows[0], dict):
+ raise ValueError('either provide columns or dict rows, not both')
+
+ nc = len(columns)
+
+ if len(set(len(r) for r in rows)) > 1:
+ raise ValueError('rows must have same number of elements')
+
+ if len(rows[0]) != nc:
+ raise ValueError('number of columns and row elements differ')
+
+ if not isinstance(columns, str):
+ columns = ', '.join(relation.db.quote(c) for c in columns)
+
+ if nc == 0:
+ if len(rows) > 1:
+ raise ValueError('insert with defaults only works for one row')
+ query = ['INSERT INTO %s DEFAULT VALUES' % relation]
+ if returning:
+ _append_returning(relation.db, query, returning)
+ return ' '.join(query), []
+
+ query = ['INSERT INTO %s (%s) VALUES' % (relation, columns)]
+ params = []
+
+ placeholders = ['(%s)' % (', '.join([relation.db.placeholder] * nc))]
+ placeholders = placeholders * len(rows)
+ placeholders = ', '.join(placeholders)
+ query.append(placeholders)
+ for r in rows:
+ params.extend(r)
+
+ _append_returning(relation.db, query, returning)
+
+ return ' '.join(query), params
A => src/deeno/util.py +15 -0
@@ 0,0 1,15 @@
+from functools import wraps
+
+
+def cachedproperty(meth):
+
+ name = meth.__name__
+ key = '_%s_cached_' % name
+
+ @wraps(meth)
+ def wrapper(obj):
+ if not hasattr(obj, key):
+ setattr(obj, key, meth(obj))
+ return getattr(obj, key)
+
+ return property(wrapper)
A => src/tests/__init__.py +21 -0
@@ 0,0 1,21 @@
+import mock
+
+from deeno.db import Database
+
+
+def MockedDatabase():
+
+ db = Database()
+ db.placeholder = '%s'
+ db.quote_char = '"'
+ db.schema_separator_char = '.'
+ db.begin = mock.Mock()
+ db.rollback = mock.Mock()
+ db.commit = mock.Mock()
+ db.execute = mock.Mock()
+ db.fetchall = mock.Mock()
+ db.fetchone = mock.Mock()
+ db.get_relation_info = mock.Mock()
+ db.fetchone = mock.Mock()
+ db.insert_and_return_row = mock.Mock()
+ return db
No newline at end of file
A => src/tests/ressources/postgresql.conf +42 -0
@@ 0,0 1,42 @@
+# - Connection Settings -
+listen_addresses = '' # none, enable unix domain sockets only
+unix_socket_directories = '.'
+unix_socket_group = ''
+unix_socket_permissions = 0777
+
+external_pid_file = './main.pid'
+
+# - Memory -
+fsync = off # !! not needed for tests
+full_page_writes = off
+shared_buffers = 20MB
+wal_buffers = 1MB
+checkpoint_segments = 1
+
+# - Logging -
+# loglevel values in order of decreasing detail:
+# debug5, debug4, debug3, debug2, debug1, log, notice, warning, error
+log_destination = 'stderr'
+logging_collector = on
+log_directory = 'pg_log'
+log_filename = 'postgresql-%Y-%m-%d.log'
+log_line_prefix = '%m %p '
+
+client_min_messages = warning
+log_error_verbosity = default
+log_min_error_statement = error
+# uncomment the following lines for comprehensive debug output
+#log_min_messages = DEBUG1
+#log_min_duration_statement = 0
+#log_connections = on
+#log_duration = on
+
+datestyle = 'iso, mdy'
+timezone = 'GMT'
+
+lc_messages = 'en_US.UTF-8'
+lc_monetary = 'en_US.UTF-8'
+lc_numeric = 'en_US.UTF-8'
+lc_time = 'en_US.UTF-8'
+
+max_connections = 20
A => src/tests/test_db.py +727 -0
@@ 0,0 1,727 @@
+import os
+import unittest
+from collections import namedtuple
+import subprocess
+import shutil
+import time
+
+import psycopg2
+
+from nose.tools import eq_, ok_, raises
+
+import mock
+
+from deeno.db import (Record, Relation, Relatiomat, NoMatchingRecord,
+ SQLiteDatabase, PostgresDatabase)
+
+from tests import MockedDatabase
+
+
+def make_row(**kwargs):
+ Row = namedtuple('Row', ','.join(sorted(kwargs)))
+ return Row(**kwargs)
+
+
+class DbTest(unittest.TestCase):
+
+ def setUp(self):
+
+ self.db = MockedDatabase()
+ self.rel = Relation(self.db, 't1', 'table', ('x',))
+
+
+class RecordTest(DbTest):
+
+ def test_attribute_interface(self):
+
+ r = Record(self.rel, x=1, y='a')
+
+ # attribute style access to items
+ eq_(r.x, 1)
+ eq_(r.y, 'a')
+
+ # regular dictionary interface:
+ eq_(sorted(r.items()), [('x', 1), ('y', 'a')])
+ eq_(sorted(r.keys()), ['x', 'y'])
+ eq_(r['x'], 1)
+ eq_(r['y'], 'a')
+
+ # setting new attributes effectively sets dict items:
+ r.z = 3
+ eq_(sorted(r.items()), [('x', 1), ('y', 'a'), ('z', 3)])
+
+ # setting existing attributes in fact sets the attributes:
+ eq_(r._table, self.rel)
+ r._table = 'foo'
+ eq_(r._table, 'foo')
+ ok_('_table' not in r.keys())
+
+ def test_init(self):
+
+ rec1 = Record(self.rel, x=1, y=2)
+ eq_(rec1.x, 1)
+ eq_(rec1.y, 2)
+
+ rec2 = Record(self.rel, {'x': 1, 'y': 2})
+ eq_(rec1.x, 1)
+ eq_(rec1.y, 2)
+
+ rec3 = Record(self.rel, (('x', 1), ('y', 2)))
+ eq_(rec1, rec2)
+ eq_(rec1, rec3)
+
+ @raises(AttributeError)
+ def test_save_no_primary_key(self):
+
+ self.rel.pk = ()
+ r = Record(self.rel, x=1, y=2)
+ r.save()
+
+ def test_save_success_single_primary_key(self):
+
+ rec = Record(self.rel, x=0, y=1)
+ rec.save()
+ q, p = 'UPDATE "t1" SET "y" = %s WHERE "x" = %s', [1, 0]
+ eq_(self.db.execute.call_args, ((q, p),))
+
+ rec.y = 2
+ rec.save()
+ q, p = 'UPDATE "t1" SET "y" = %s WHERE "x" = %s', [2, 0]
+ eq_(self.db.execute.call_args, ((q, p),))
+
+ def test_save_success_composite_primary_key(self):
+
+ self.rel.pk = ('x', 'y')
+ rec = Record(self.rel, x=0, y=1, z=2)
+ rec.save()
+ q = 'UPDATE "t1" SET "z" = %s WHERE "x" = %s AND "y" = %s'
+ p = [2, 0, 1]
+ eq_(self.db.execute.call_args, ((q, p),))
+
+ rec.z = 3
+ rec.save()
+ q = 'UPDATE "t1" SET "z" = %s WHERE "x" = %s AND "y" = %s'
+ p = [3, 0, 1]
+ eq_(self.db.execute.call_args, ((q, p),))
+
+ def test_save_only_primary_key(self):
+
+ rec = Record(self.rel, x=0)
+ rec.save()
+ eq_(self.db.execute.call_count, 0)
+
+ @raises(AttributeError)
+ def test_delete_missing_primary_key(self):
+
+ self.rel.pk = ()
+ rec = Record(self.rel, x=0, y=1)
+ rec.delete()
+
+ @raises(AttributeError)
+ def test_change_primary_key(self):
+
+ rec = Record(self.rel, x=0, y=1)
+ rec.x = 1
+
+ def test_delete_success_single_primary_key(self):
+
+ rec = Record(self.rel, x=0, y=1)
+ rec.delete()
+ eq_(self.db.execute.mock_calls[0],
+ mock.call('DELETE FROM "t1" WHERE "x" = %s', [0]))
+
+ def test_delete_success_composite_primary_key(self):
+
+ self.rel.pk = ('x', 'y')
+ rec = Record(self.rel, x=0, y=1, z=2)
+ rec.delete()
+ q, p = 'DELETE FROM "t1" WHERE "x" = %s AND "y" = %s', [0, 1]
+ eq_(self.db.execute.call_args, ((q, p),))
+
+
+class RelationTest(DbTest):
+
+ def test_new_success(self):
+
+ row = make_row(x=42, y=2, z=3)
+ self.db.insert_and_return_row.return_value = row
+ rec = self.rel.new(y=2, z=3)
+ eq_(rec, {'x': 42, 'y': 2, 'z': 3})
+
+ @raises(AttributeError)
+ def test_new_no_primary_key(self):
+
+ self.rel.pk = ()
+ self.rel.new(y=2, z=3)
+
+ def test_join_regex(self):
+
+ r = Relation._rx_join
+
+ ok_(r.match('join'))
+ ok_(r.match('left_join'))
+ ok_(r.match('left_outer_join'))
+ ok_(r.match('cross_join'))
+ ok_(r.match('inner_join'))
+ ok_(r.match('natural_left_join'))
+ ok_(r.match('natural_left_outer_join'))
+ ok_(r.match('natural_inner_join'))
+
+ ok_(not r.match('natural_cross_join'))
+ ok_(not r.match('left_inner_join'))
+ ok_(not r.match('abc'))
+ ok_(not r.match('join_me'))
+
+ def test_join(self):
+
+ j = self.rel.left_join('t2', using='x')
+ eq_(j.name, '"t1" LEFT JOIN "t2" USING (x)')
+ eq_(j.type, 'join')
+
+ j = self.rel.join('t2', on='y = a')
+ eq_(j.name, '"t1" JOIN "t2" ON y = a')
+ eq_(j.type, 'join')
+
+ j = self.rel.natural_left_outer_join('t2')
+ eq_(j.name, '"t1" NATURAL LEFT OUTER JOIN "t2"')
+ eq_(j.type, 'join')
+
+ j = self.rel.left_join('t2', using=('x',)).inner_join('t3', on='z = a')
+ eq_(j.name, '"t1" LEFT JOIN "t2" USING ("x") INNER JOIN "t3" ON z = a')
+ eq_(j.type, 'join')
+ j.select(where=('x < %s', [100]))
+ q = ('SELECT * FROM "t1" LEFT JOIN "t2" USING ("x") '
+ 'INNER JOIN "t3" ON z = a WHERE x < %s')
+ p = [100]
+ eq_(self.db.fetchall.call_args, ((q, p),))
+
+ def test_get_success(self):
+
+ row = make_row(x=1, y=2)
+ self.db.fetchall.return_value = (r for r in [row])
+ rec = self.rel.get(x=1)
+ eq_(type(rec), Record)
+ eq_(rec, {'x': 1, 'y': 2})
+ q, p = 'SELECT * FROM "t1" WHERE "x" = %s', [1]
+ eq_(self.db.fetchall.call_args, ((q, p),))
+
+ @raises(NoMatchingRecord)
+ def test_get_no_matching_record(self):
+
+ self.db.fetchall.return_value = (r for r in [])
+ self.rel.get(x=1)
+
+ def test_select(self):
+
+ self.db.fetchall.return_value = 'row iterator'
+ rows = self.rel.select()
+ eq_(rows, 'row iterator')
+ q, p = 'SELECT * FROM "t1"', []
+ eq_(self.db.fetchall.call_args, ((q, p),))
+
+ def test_select_columns(self):
+
+ self.db.fetchall.return_value = 'row iterator'
+ rows = self.rel.select(('c1', 'c2'))
+ eq_(rows, 'row iterator')
+ q, p = 'SELECT "c1", "c2" FROM "t1"', []
+ eq_(self.db.fetchall.call_args, ((q, p),))
+
+ def test_select_where_orderby(self):
+ self.db.fetchall.return_value = 'row iterator'
+ rows = self.rel.select(where={'y': 3}, orderby=('x', 'y', 3))
+ q, p = 'SELECT * FROM "t1" WHERE "y" = %s ORDER BY "x", "y", 3', [3]
+ eq_(self.db.fetchall.call_args, ((q, p),))
+ eq_(rows, 'row iterator')
+
+ def test_delete(self):
+
+ self.db.execute.return_value = 3
+ n = self.rel.delete()
+ eq_(n, 3)
+ q, p = 'DELETE FROM "t1"', []
+ eq_(self.db.execute.call_args, ((q, p),))
+
+ def test_delete_where_returning(self):
+
+ self.db.fetchall.return_value = 'row iterator'
+ rows = self.rel.delete(where=('x > %s', [100]), returning=('x',))
+ eq_(rows, 'row iterator')
+ q, p = 'DELETE FROM "t1" WHERE x > %s RETURNING "x"', [100]
+ eq_(self.db.fetchall.call_args, ((q, p),))
+
+ def test_update(self):
+
+ self.db.execute.return_value = 80
+ n = self.rel.update({'y': 10, 'z': 0})
+ eq_(n, 80)
+ q, p = 'UPDATE "t1" SET "y" = %s, "z" = %s', [10, 0]
+ eq_(self.db.execute.call_args, ((q, p),))
+
+ def test_update_where_returning(self):
+
+ self.db.fetchall.return_value = 'row iterator'
+ rows = self.rel.update({'y': 10, 'z': 0}, where={'z': 5, 'y': 0},
+ returning='*')
+ eq_(rows, 'row iterator')
+ q = ('UPDATE "t1" SET "y" = %s, "z" = %s '
+ 'WHERE "y" = %s AND "z" = %s RETURNING *')
+ p = [10, 0, 0, 5]
+ eq_(self.db.fetchall.call_args, ((q, p),))
+
+ def test_insert(self):
+
+ self.db.execute.return_value = 17
+ n = self.rel.insert(row={'y': 10, 'z': 0})
+ eq_(n, 17)
+ q, p = 'INSERT INTO "t1" ("y", "z") VALUES (%s, %s)', [10, 0]
+ eq_(self.db.execute.call_args, ((q, p),))
+
+ def test_insert_returning(self):
+
+ self.db.fetchall.return_value = 'row iterator'
+ rows = self.rel.insert(columns='y, z', rows=[(10, 0), (11, 0)],
+ returning='x')
+ eq_(rows, 'row iterator')
+ q = ('INSERT INTO "t1" ("y", "z") VALUES (%s, %s), (%s, %s) '
+ 'RETURNING x')
+ p = [10, 0, 11, 0]
+ eq_(self.db.fetchall.call_args, ((q, p),))
+
+
+class RelatiomatTest(DbTest):
+
+ def test(self):
+
+ self.db.get_relation_info.return_value = ('table', ('x', 'y'))
+
+ rm = Relatiomat(self.db)
+
+ rel1 = rm.t1
+ eq_(type(rel1), Relation)
+ eq_(rel1.db, self.db)
+ eq_(rel1.name, '"t1"')
+ eq_(rel1.pk, ('x', 'y'))
+ eq_(self.db.get_relation_info.call_count, 1)
+
+ # relations are cached
+ rel2 = rm.t1
+ ok_(rel1 is rel2)
+ eq_(self.db.get_relation_info.call_count, 1)
+ rel3 = rm('t1')
+ ok_(rel3 is rel1)
+ rel4 = rm['t1']
+ ok_(rel4 is rel1)
+ eq_(self.db.get_relation_info.call_count, 1)
+
+ # ... unless the cache is invalidated:
+ rm._cache.clear()
+ rel5 = rm['t1']
+ ok_(rel5 is not rel1)
+ eq_(self.db.get_relation_info.call_count, 2)
+
+ self.db.get_relation_info.return_value = ('view', None)
+
+ rel6 = rm['t2']
+ ok_(rel6 is not rel1)
+ eq_(type(rel6), Relation)
+ eq_(rel6.db, self.db)
+ eq_(rel6.name, '"t2"')
+ eq_(rel6.type, 'view')
+ eq_(rel6.pk, ())
+ eq_(self.db.get_relation_info.call_count, 3)
+
+ self.db.get_relation_info.return_value = ('table', None)
+
+ rel7 = rm.s1__t3
+ eq_(type(rel7), Relation)
+ eq_(rel7.db, self.db)
+ eq_(rel7.name, '"s1"."t3"')
+ eq_(rel7.type, 'table')
+ eq_(rel7.pk, ())
+ eq_(self.db.get_relation_info.call_count, 4)
+
+ rel8 = rm('s1.t3')
+ ok_(rel8 is rel7)
+
+
+class MockedDatabaseTest(DbTest):
+
+ def test_transaction_failure(self):
+
+ self.db.execute.side_effect = Exception('boom')
+ try:
+ with self.db.transaction():
+ self.db.execute()
+ self.db.execute()
+ ok_(False)
+ except Exception as e:
+ eq_(str(e), 'boom')
+ eq_(self.db.begin.call_count, 1)
+ eq_(self.db.execute.call_count, 1)
+ eq_(self.db.rollback.call_count, 1)
+ eq_(self.db.commit.call_count, 0)
+
+ def test_transaction_success(self):
+
+ with self.db.transaction():
+ self.db.execute()
+ self.db.execute()
+ eq_(self.db.begin.call_count, 1)
+ eq_(self.db.execute.call_count, 2)
+ eq_(self.db.rollback.call_count, 0)
+ eq_(self.db.commit.call_count, 1)
+
+ def test_relatiomat_interface(self):
+
+ rm1 = self.db.t
+ rm2 = self.db.r
+ eq_(type(rm1), Relatiomat)
+ ok_(rm1 is rm2)
+
+
+class AbstractDatabaseTest(object):
+
+ ROWCOUNT_FOR_SELECT = True
+ SERIAL_KEY_TYPE = None
+
+ def test_connect(self):
+
+ eq_(getattr(self.db._tl, 'connection', None), None)
+ self.db.connect()
+ ok_(getattr(self.db._tl, 'connection', None) is not None)
+ self.db.disconnect()
+ eq_(getattr(self.db._tl, 'connection', None), None)
+
+ def test_get_relation_info(self):
+
+ # a simple table
+ q = "CREATE TABLE t1 (c1 INTEGER PRIMARY KEY, c2 TEXT)"
+ self.db.execute(q)
+ rt, pk = self.db.get_relation_info('t1')
+ eq_(rt, 'table')
+ eq_(pk, ('c1',))
+
+ # a view
+ q = "CREATE VIEW v1 AS SELECT c1 FROM t1"
+ self.db.execute(q)
+ rt, pk = self.db.get_relation_info('v1')
+ eq_(rt, 'view')
+ eq_(pk, None)
+
+ # a table with a composite primary key
+ self.db.execute("""
+ CREATE TABLE t2 (
+ c1 INTEGER,
+ c2 TEXT,
+ CONSTRAINT t2_pk PRIMARY KEY (c1, c2)
+ )
+ """)
+ rt, pk = self.db.get_relation_info('t2')
+ eq_(rt, 'table')
+ eq_(pk, ('c1', 'c2'))
+
+ try:
+ self.db.get_relation_info('xx')
+ ok_(False)
+ except ValueError:
+ pass
+
+ def test_execute(self):
+
+ n = self.db.execute("SELECT 22")
+ eq_(n, 1 if self.ROWCOUNT_FOR_SELECT else None)
+ q = "CREATE TABLE t1 (c1 INTEGER PRIMARY KEY, c2 TEXT)"
+ self.db.execute(q)
+ q = "UPDATE t1 SET c1 = 0"
+ n = self.db.execute(q)
+ eq_(n, 0)
+ q = "INSERT INTO t1 (c1, c2) VALUES (1, 'a'), (2, 'b')"
+ self.db.execute(q)
+ q = "UPDATE t1 SET c2 = 'x'"
+ n = self.db.execute(q)
+ eq_(n, 2)
+ q = "UPDATE t1 SET c2 = 'y' WHERE c1 = 1"
+ n = self.db.execute(q)
+ eq_(n, 1)
+
+ def test_script(self):
+ q = "CREATE TABLE t1 (c1 INTEGER PRIMARY KEY, c2 TEXT)"
+ self.db.execute(q)
+ q = """
+ INSERT INTO t1 (c1, c2) VALUES (1, 'a'), (2, 'b');
+ INSERT INTO t1 (c1, c2) VALUES (3, 'c'), (4, 'd');
+ """
+ self.db.script(q)
+ q = "SELECT COUNT(*) AS c FROM t1"
+ r = self.db.fetchone(q)
+ eq_(r.c, 4)
+
+ def test_fetchall(self):
+
+ q = "CREATE TABLE t1 (c1 INTEGER PRIMARY KEY, c2 TEXT)"
+ self.db.execute(q)
+ q = "INSERT INTO t1 (c1, c2) VALUES (1, 'a'), (2, 'b')"
+ self.db.execute(q)
+ q = "SELECT * FROM t1"
+ rows = self.db.fetchall(q)
+ ok_(hasattr(rows, '__iter__'))
+ ok_(not isinstance(rows, (tuple, list)))
+ rows = tuple(rows)
+ eq_(len(rows), 2)
+ eq_(rows[0], (1, 'a'))
+ eq_(rows[1], (2, 'b'))
+ eq_(rows[0].c1, 1)
+ eq_(rows[0].c2, 'a')
+
+ def test_fetchone(self):
+ q = "CREATE TABLE t1 (c1 INTEGER PRIMARY KEY, c2 TEXT)"
+ self.db.execute(q)
+ q = "INSERT INTO t1 (c1, c2) VALUES (1, 'a'), (2, 'b')"
+ self.db.execute(q)
+ q = "SELECT * FROM t1"
+ row = self.db.fetchone(q)
+ eq_(row, (1, 'a'))
+ eq_(row.c1, 1)
+ eq_(row.c2, 'a')
+
+ def test_insert_and_return_row(self):
+ q = "CREATE TABLE t1 (c1 INTEGER PRIMARY KEY, c2 TEXT)"
+ self.db.execute(q)
+ relation = self.db.r.t1
+ row = self.db.insert_and_return_row(relation, {'c1': 1, 'c2': 'a'})
+ eq_(row, (1, 'a'))
+ eq_(row.c1, 1)
+ eq_(row.c2, 'a')
+ row = self.db.insert_and_return_row(relation, {'c1': 2})
+ eq_(row, (2, None))
+ self.db.execute("""
+ CREATE TABLE t2 (
+ c1 INTEGER,
+ c2 TEXT,
+ CONSTRAINT t2_pk PRIMARY KEY (c1, c2)
+ )
+ """)
+ relation = self.db.r.t2
+ row = self.db.insert_and_return_row(relation, {'c1': 1, 'c2': 'a'})
+ eq_(row, (1, 'a'))
+ row = self.db.insert_and_return_row(relation, {'c1': 1, 'c2': 'b'})
+ eq_(row, (1, 'b'))
+
+ def test_transaction(self):
+
+ # autocommit by default
+ q = "CREATE TABLE t1 (c1 INTEGER PRIMARY KEY, c2 TEXT)"
+ self.db.execute(q)
+ q = "INSERT INTO t1 (c1, c2) VALUES (1, 'a'), (2, 'b')"
+ self.db.execute(q)
+ self.db.rollback()
+ q = "SELECT COUNT(*) AS c1 FROM t1"
+ r = self.db.fetchone(q)
+ eq_(r, (2,))
+
+ # no changes on rollback
+ self.db.begin()
+ q = "INSERT INTO t1 (c1, c2) VALUES (3, 'c')"
+ self.db.execute(q)
+ q = "INSERT INTO t1 (c1, c2) VALUES (4, 'd')"
+ self.db.execute(q)
+ self.db.rollback()
+ q = "SELECT COUNT(*) AS c1 FROM t1"
+ r = self.db.fetchone(q)
+ eq_(r, (2,))
+
+ # commit commits
+ self.db.begin()
+ q = "INSERT INTO t1 (c1, c2) VALUES (3, 'c')"
+ self.db.execute(q)
+ q = "INSERT INTO t1 (c1, c2) VALUES (4, 'd')"
+ self.db.execute(q)
+ self.db.commit()
+ q = "SELECT COUNT(*) AS c1 FROM t1"
+ r = self.db.fetchone(q)
+ eq_(r, (4,))
+
+ # context manager with exception
+ try:
+ with self.db.transaction():
+ q = "INSERT INTO t1 (c1, c2) VALUES (5, 'e')"
+ self.db.execute(q)
+ q = "INSERT INTO t1 (c1, c2) VALUES (6, 'f')"
+ self.db.execute(q)
+ raise ValueError('ohno')
+ except ValueError:
+ q = "SELECT COUNT(*) AS c1 FROM t1"
+ r = self.db.fetchone(q)
+ eq_(r, (4,))
+
+ # succeeding context manager
+ with self.db.transaction():
+ q = "INSERT INTO t1 (c1, c2) VALUES (5, 'e')"
+ self.db.execute(q)
+ q = "INSERT INTO t1 (c1, c2) VALUES (6, 'f')"
+ self.db.execute(q)
+ q = "SELECT COUNT(*) AS c1 FROM t1"
+ r = self.db.fetchone(q)
+ eq_(r, (6,))
+
+ # autocommit is on again
+ q = "INSERT INTO t1 (c1, c2) VALUES (7, 'g'), (8, 'h')"
+ self.db.execute(q)
+ self.db.rollback()
+ q = "SELECT COUNT(*) AS c1 FROM t1"
+ r = self.db.fetchone(q)
+ eq_(r, (8,))
+
+ def test_record(self):
+
+ q = ("CREATE TABLE t1 (c1 %s, c2 TEXT, c3 INTEGER DEFAULT 3)" %
+ self.SERIAL_KEY_TYPE)
+ self.db.execute(q)
+ q = "INSERT INTO t1 (c2) VALUES ('a'), ('b')"
+ self.db.execute(q)
+
+ r = self.db.r.t1.get(c2='a')
+ eq_(r, {'c1': 1, 'c2': 'a', 'c3': 3})
+ r = self.db.r.t1.new()
+ eq_(r, {'c1': 3, 'c2': None, 'c3': 3})
+
+ def test_keyword_names(self):
+
+ # note: some limitations come from the name constraints of named tuples
+
+ q = 'CREATE TABLE "select" ("integer" INTEGER, "text" TEXT)'
+ self.db.execute(q)
+ self.db.r.select.insert(row={'integer': 1, 'text': 'x'})
+ r = self.db.r.select.get(integer=1)
+ eq_(r, {'text': 'x', 'integer': 1})
+
+
+class SqliteDatabaseTest(AbstractDatabaseTest, unittest.TestCase):
+
+ ROWCOUNT_FOR_SELECT = False
+ SERIAL_KEY_TYPE = 'INTEGER PRIMARY KEY ASC'
+
+ def setUp(self):
+ self.db = SQLiteDatabase(config={'path': ':memory:'})
+
+
+class PostgresDatabaseTest(AbstractDatabaseTest, unittest.TestCase):
+
+ SERIAL_KEY_TYPE = 'SERIAL PRIMARY KEY'
+
+ test_tables = ('t1', 't2', 'select')
+ test_schemas = ('s1', 's2')
+
+ PG_BINS = (
+ '/opt/postgresql/9.3/bin',
+ '/usr/bin',
+ '/usr/local/bin',
+ )
+
+ @classmethod
+ def setUpClass(cls):
+
+ here = os.path.dirname(__file__)
+ pgconf = os.path.join(os.path.dirname(__file__), 'ressources',
+ 'postgresql.conf')
+ pgtc = os.path.realpath(os.path.join(here, '..', '..', 'var', 'pgtc'))
+
+ for pgbin in cls.PG_BINS:
+ if os.path.exists(os.path.join(pgbin, 'postgres')):
+ break
+ create = not os.path.exists(os.path.join(pgtc, 'PG_VERSION'))
+ if create:
+ print('creating new postgres test cluster')
+ os.makedirs(pgtc)
+ initdb = os.path.join(pgbin, 'initdb')
+ subprocess.call([initdb, '--pgdata=%s' % pgtc, '--encoding=utf-8'])
+ shutil.copy(pgconf, os.path.join(pgtc, os.path.basename(pgconf)))
+ print('starting postgres')
+ postgres = os.path.join(pgbin, 'postgres')
+ cls.pgproc = subprocess.Popen([postgres, '-D', pgtc])
+
+ print('try to connect to postgres server')
+ user = os.environ['USER']
+ tries = 0
+ while True:
+ try:
+ psycopg2.connect(host=pgtc, database='postgres', user=user)
+ break
+ except psycopg2.OperationalError:
+ if tries > 20:
+ raise RuntimeError('failed to connect to postgres test '
+ 'cluster')
+ time.sleep(0.25)
+ tries += 1
+
+ if create:
+ createdb = os.path.join(pgbin, 'createdb')
+ subprocess.check_call([createdb, '-h', pgtc, 'deeno'])
+
+ cls.pgtc = pgtc
+
+ @classmethod
+ def tearDownClass(cls):
+
+ cls.pgproc.terminate()
+
+ def setUp(self):
+
+ self.db = PostgresDatabase(config={'host': self.pgtc,
+ 'database': 'deeno', 'user': os.environ['USER']})
+
+ def tearDown(self):
+
+ self.db.disconnect()
+
+ for s in self.test_schemas:
+ q = 'DROP SCHEMA IF EXISTS "%s" CASCADE' % s
+ self.db.execute(q)
+ for t in self.test_tables:
+ q = 'DROP TABLE IF EXISTS "%s" CASCADE' % t
+ self.db.execute(q)
+
+ self.db.disconnect()
+
+ def test_get_relation_info_schema(self):
+
+ q = "CREATE SCHEMA s1"
+ self.db.execute(q)
+ q = "CREATE TABLE s1.t1 (c1 INTEGER PRIMARY KEY, c2 TEXT DEFAULT 'x')"
+ self.db.execute(q)
+ rt, pk = self.db.get_relation_info('s1.t1')
+ eq_(rt, 'table')
+ eq_(pk, ('c1',))
+
+ q = "CREATE SCHEMA s2"
+ self.db.execute(q)
+ q = "CREATE TABLE s2.t1 (ca INTEGER PRIMARY KEY, cb TEXT DEFAULT 'x')"
+ self.db.execute(q)
+
+ q = "SET search_path TO public,s1"
+ self.db.execute(q)
+
+ rt, pk = self.db.get_relation_info('t1')
+ eq_(rt, 'table')
+ eq_(pk, ('c1',))
+
+ q = "SET search_path TO public,s2"
+ self.db.execute(q)
+ rt, pk = self.db.get_relation_info('t1')
+ eq_(rt, 'table')
+ eq_(pk, ('ca',))
+
+ def test_record_schema(self):
+
+ q = "CREATE SCHEMA s1"
+ self.db.execute(q)
+ q = "CREATE TABLE s1.t1 (c1 SERIAL PRIMARY KEY, c2 TEXT DEFAULT 'x')"
+ self.db.execute(q)
+
+ r = self.db.r.s1__t1.new()
+ eq_(r, {'c1': 1, 'c2': 'x'})
+ r = self.db.r.s1__t1.new(c2='y')
+ eq_(r, {'c1': 2, 'c2': 'y'})
+ r = self.db.r['s1.t1'].get(c1=2)
+ eq_(r, {'c1': 2, 'c2': 'y'})
A => src/tests/test_sql.py +277 -0
@@ 0,0 1,277 @@
+# -*- coding: utf-8 -*-
+import unittest
+
+from nose.tools import eq_, ok_
+
+from deeno import sql
+from deeno.db import Relation
+
+from tests import MockedDatabase
+
+
+class SqlTest(unittest.TestCase):
+
+ def setUp(self):
+
+ self.db = MockedDatabase()
+ self.r = Relation(self.db, 't1', 'table', 'xid')
+
+
+class JoinTest(SqlTest):
+
+ # note: joined table names are expected to quoted by `Relation` already
+
+ def test_on(self):
+ q = sql.join(self.r, 'join', 't2', on='a = b')
+ eq_(q, '"t1" JOIN t2 ON a = b')
+
+ def test_using(self):
+ q = sql.join(self.r, 'left join', 't2', using=('c1', 'c2'))
+ eq_(q, '"t1" LEFT JOIN t2 USING ("c1", "c2")')
+
+ q = sql.join(self.r, 'left join', 't2', using='c1, c2')
+ eq_(q, '"t1" LEFT JOIN t2 USING (c1, c2)')
+
+ def test_multiple(self):
+ r = Relation(self.db, 't1 LEFT JOIN t2 USING (c1, c2)', 'join', None)
+ q = sql.join(r, 'right outer join', 't3', using=('c3',))
+ eq_(q, 't1 LEFT JOIN t2 USING (c1, c2) '
+ 'RIGHT OUTER JOIN t3 USING ("c3")')
+
+ def test_natural(self):
+ q = sql.join(self.r, 'natural join', 't2')
+ eq_(q, '"t1" NATURAL JOIN t2')
+
+
+class SelectTest(SqlTest):
+
+ def test_simple(self):
+ q, p = sql.select(self.r)
+ eq_(q, 'SELECT * FROM "t1"')
+ eq_(p, [])
+
+ def test_simple_with_columns(self):
+ q, p = sql.select(self.r, ('c1', 'c2', 'c3'))
+ eq_(q, 'SELECT "c1", "c2", "c3" FROM "t1"')
+ eq_(p, [])
+
+ def test_where(self):
+ w = {'c1': 55, 'c2': 'foo bar'}
+ q, p = sql.select(self.r, ('c1', 'c2', 'c3'), where=w)
+ eq_(q, 'SELECT "c1", "c2", "c3" FROM "t1" '
+ 'WHERE "c1" = %s AND "c2" = %s')
+ eq_(p, [55, 'foo bar'])
+
+ w = ("c1 = %s AND c2::text = 'bar foo'", [0.2])
+ q, p = sql.select(self.r, ('c1', 'c2', 'c3'), where=w)
+ eq_(q, 'SELECT "c1", "c2", "c3" FROM "t1" '
+ "WHERE c1 = %s AND c2::text = 'bar foo'")
+ eq_(p, [0.2])
+
+ @unittest.skip
+ def test_groupby(self):
+ pass # TODO
+
+ @unittest.skip
+ def test_having(self):
+ pass # TODO
+
+ @unittest.skip
+ def test_orderby(self):
+ pass # TODO
+
+ @unittest.skip
+ def test_limit(self):
+ pass # TODO
+
+ @unittest.skip
+ def test_offset(self):
+ pass # TODO
+
+
+class UpdateTest(SqlTest):
+
+ def test_assign(self):
+
+ a = {'c1': 55, 'c2': '2020-01-01'}
+ q, p = sql.update(self.r, a)
+ eq_(q, 'UPDATE "t1" SET "c1" = %s, "c2" = %s')
+ eq_(p, [55, '2020-01-01'])
+
+ a = ("c1 = %s, c2 = '2020-01-01'", [55])
+ q, p = sql.update(self.r, a)
+ eq_(q, "UPDATE \"t1\" SET c1 = %s, c2 = '2020-01-01'")
+ eq_(p, [55])
+
+ def test_where(self):
+
+ a = {'c1': True}
+ w = {'c1': False, 'c2': 'foo'}
+ q, p = sql.update(self.r, a, where=w)
+ eq_(q, 'UPDATE "t1" SET "c1" = %s WHERE "c1" = %s AND "c2" = %s')
+ eq_(p, [True, False, 'foo'])
+
+ a = {'c1': True}
+ w = ("c1 = %s AND c2 = 'foo'", [False])
+ q, p = sql.update(self.r, a, where=w)
+ eq_(q, "UPDATE \"t1\" SET \"c1\" = %s WHERE c1 = %s AND c2 = 'foo'")
+ eq_(p, [True, False])
+
+ a = ("c1 = %s, c2 = 'bar'", [True])
+ w = ("c1 = %s AND c2 = 'foo'", [False])
+ q, p = sql.update(self.r, a, where=w)
+ eq_(q, "UPDATE \"t1\" SET c1 = %s, c2 = 'bar' "
+ "WHERE c1 = %s AND c2 = 'foo'")
+ eq_(p, [True, False])
+
+ def test_returning(self):
+
+ a = {'plan': 'free'}
+ w = ('created < now() AND logins > %s', [100])
+ q, p = sql.update(self.r, a, where=w, returning=('id', 'name'))
+ eq_(q, 'UPDATE "t1" SET "plan" = %s '
+ 'WHERE created < now() AND logins > %s RETURNING "id", "name"')
+ eq_(p, ['free', 100])
+
+ q, p = sql.update(self.r, a, where=w, returning='id, name')
+ eq_(q, 'UPDATE "t1" SET "plan" = %s '
+ 'WHERE created < now() AND logins > %s RETURNING id, name')
+ eq_(p, ['free', 100])
+
+ def test_placeholder_and_quote_char(self):
+
+ self.db.placeholder = '?'
+ self.db.quote_char = '`'
+ self.r = Relation(self.db, 't1', 'table', 'xid')
+ a = {'plan': 'free'}
+ w = ('created < now() AND logins > ?', [100])
+ q, p = sql.update(self.r, a, where=w, returning=('id', 'name'))
+ eq_(q, 'UPDATE `t1` SET `plan` = ? '
+ 'WHERE created < now() AND logins > ? RETURNING `id`, `name`')
+ eq_(p, ['free', 100])
+
+
+class InsertTest(SqlTest):
+
+ def test_defaults_only(self):
+
+ q, p = sql.insert(self.r)
+ eq_(q, 'INSERT INTO "t1" DEFAULT VALUES')
+ eq_(p, [])
+
+ q, p = sql.insert(self.r, returning='*')
+ eq_(q, 'INSERT INTO "t1" DEFAULT VALUES RETURNING *')
+ eq_(p, [])
+
+ q, p = sql.insert(self.r, returning=('a', 'b'))
+ eq_(q, 'INSERT INTO "t1" DEFAULT VALUES RETURNING "a", "b"')
+ eq_(p, [])
+
+ def test_value_error(self):
+
+ try:
+ sql.insert(self.r, columns=('a', 'b'))
+ ok_(False)
+ except ValueError as e:
+ eq_(str(e), 'need some rows')
+
+ try:
+ sql.insert(self.r, columns=('a', 'b'))
+ ok_(False)
+ except ValueError as e:
+ eq_(str(e), 'need some rows')
+
+ try:
+ sql.insert(self.r, columns=('a', 'b'), row=[1, 2, 3])
+ ok_(False)
+ except ValueError as e:
+ eq_(str(e), 'number of columns and row elements differ')
+
+ try:
+ sql.insert(self.r, columns=('a', 'b'), rows=[[1, 2], [1, 2, 3]])
+ ok_(False)
+ except ValueError as e:
+ eq_(str(e), 'rows must have same number of elements')
+
+ try:
+ sql.insert(self.r, columns=('a', 'b'), rows=[{'c1': 1, 'c2': 'a'}])
+ ok_(False)
+ except ValueError as e:
+ eq_(str(e), 'either provide columns or dict rows, not both')
+
+ try:
+ sql.insert(self.r, rows=[{}, {}])
+ ok_(False)
+ except ValueError as e:
+ eq_(str(e), 'insert with defaults only works for one row')
+
+ try:
+ sql.insert(self.r, rows=[[], []])
+ ok_(False)
+ except ValueError as e:
+ eq_(str(e), 'either provide columns ore dictionary rows')
+
+ try:
+ sql.insert(self.r, columns=(), rows=[[], []])
+ ok_(False)
+ except ValueError as e:
+ eq_(str(e), 'insert with defaults only works for one row')
+
+ def test_single_row_with_columns(self):
+
+ q, p = sql.insert(self.r, columns=('c1', 'c2'), row=[1, 'a'])
+ eq_(q, 'INSERT INTO "t1" ("c1", "c2") VALUES (%s, %s)')
+ eq_(p, [1, 'a'])
+
+ def test_single_row_as_dict(self):
+
+ q, p = sql.insert(self.r, row={'c1': 1, 'c2': 'a'})
+ eq_(q, 'INSERT INTO "t1" ("c1", "c2") VALUES (%s, %s)')
+ eq_(p, [1, 'a'])
+
+ def test_multiple_rows_with_columns(self):
+
+ rows = [[1, 'a'], [2, 'b']]
+ q, p = sql.insert(self.r, columns=('c1', 'c2'), rows=rows)
+ eq_(q, 'INSERT INTO "t1" ("c1", "c2") VALUES (%s, %s), (%s, %s)')
+ eq_(p, [1, 'a', 2, 'b'])
+
+ def test_multiple_rows_as_dicts(self):
+
+ rows = [{'c1': 1, 'c2': 'a'}, {'c1': 2, 'c2': 'b'}]
+ q, p = sql.insert(self.r, rows=rows)
+ eq_(q, 'INSERT INTO "t1" ("c1", "c2") VALUES (%s, %s), (%s, %s)')
+ eq_(p, [1, 'a', 2, 'b'])
+
+
+class DeleteTest(SqlTest):
+
+ def test_all(self):
+
+ q, p = sql.delete(self.r)
+ eq_(q, 'DELETE FROM "t1"')
+ eq_(p, [])
+
+ def test_where(self):
+
+ w = {'c1': False, 'c2': 'foo'}
+ q, p = sql.delete(self.r, where=w)
+ eq_(q, 'DELETE FROM "t1" WHERE "c1" = %s AND "c2" = %s')
+ eq_(p, [False, 'foo'])
+
+ w = 'c1 = %s AND c2 = %s', [False, 'foo']
+ q, p = sql.delete(self.r, where=w)
+ eq_(q, 'DELETE FROM "t1" WHERE c1 = %s AND c2 = %s')
+ eq_(p, [False, 'foo'])
+
+ def test_returning(self):
+
+ w = {'c1': False}
+ q, p = sql.delete(self.r, where=w, returning=('c2', 'c3'))
+ eq_(q, 'DELETE FROM "t1" WHERE "c1" = %s RETURNING "c2", "c3"')
+ eq_(p, [False])
+
+ w = {'c1': False}
+ q, p = sql.delete(self.r, where=w, returning='c2, c3')
+ eq_(q, 'DELETE FROM "t1" WHERE "c1" = %s RETURNING c2, c3')
+ eq_(p, [False])
A => src/tests/test_util.py +23 -0
@@ 0,0 1,23 @@
+import unittest
+
+from nose.tools.trivial import eq_
+
+from deeno.util import cachedproperty
+
+
+class CachedpropertyTest(unittest.TestCase):
+
+ def test(self):
+
+ class X(object):
+ i = 0
+
+ @cachedproperty
+ def m(self):
+ self.i += 1
+ return self.i
+
+ x = X()
+ eq_(x.m, 1)
+ eq_(x.m, 1)
+ eq_(x._m_cached_, 1)
A => src/tools/__init__.py +0 -0
A => src/tools/style.py +30 -0
@@ 0,0 1,30 @@
+import subprocess
+import sys
+
+PEP8_IGNORE = ','.join([
+ 'E128',
+ 'E401', # multiple imports on one line
+])
+
+
+def main():
+
+ only_changed = False
+ if len(sys.argv) == 2 and sys.argv[1] == 'changed':
+ only_changed = True
+ elif len(sys.argv) != 1:
+ print('usage: style [changed]')
+ sys.exit(1)
+
+ paths = ['src', 'setup.py']
+ if only_changed:
+ cmd = ['hg', 'status', '-nma'] + paths
+ out = subprocess.check_output(cmd, universal_newlines=True)
+ fnames = out.strip().split()
+ fnames = [f for f in fnames if f.endswith('.py')]
+ else:
+ fnames = paths
+ if not fnames:
+ return 0
+ ret = subprocess.call(['bin/pep8', '--ignore=%s' % PEP8_IGNORE] + fnames)
+ return ret