Commit 7beb0db7 authored by Aymeric Augustin's avatar Aymeric Augustin
Browse files

Fixed #10320 -- Made it possible to use executemany with iterators. Thanks MockSoul for the report.



git-svn-id: http://code.djangoproject.com/svn/django/trunk@17387 bcc190cf-cafb-0310-a4f2-bffc1f526a37
parent 4d030e54
Loading
Loading
Loading
Loading
+3 −0
Original line number Diff line number Diff line
@@ -670,6 +670,9 @@ class FormatStylePlaceholderCursor(object):
            raise utils.DatabaseError, utils.DatabaseError(*tuple(e)), sys.exc_info()[2]

    def executemany(self, query, params=None):
        # cx_Oracle doesn't support iterators, convert them to lists
        if params is not None and not isinstance(params, (list, tuple)):
            params = list(params)
        try:
            args = [(':arg%d' % i) for i in range(len(params[0]))]
        except (IndexError, TypeError):
+7 −3
Original line number Diff line number Diff line
@@ -58,8 +58,12 @@ class CursorDebugWrapper(CursorWrapper):
        finally:
            stop = time()
            duration = stop - start
            try:
                times = len(param_list)
            except TypeError:           # param_list could be an iterator
                times = '?'
            self.db.queries.append({
                'sql': '%s times: %s' % (len(param_list), sql),
                'sql': '%s times: %s' % (times, sql),
                'time': "%.3f" % duration,
            })
            logger.debug('(%.3f) %s; args=%s' % (duration, sql, param_list),
+30 −9
Original line number Diff line number Diff line
@@ -14,6 +14,7 @@ from django.db.backends.signals import connection_created
from django.db.backends.postgresql_psycopg2 import version as pg_version
from django.db.utils import ConnectionHandler, DatabaseError, load_backend
from django.test import TestCase, skipUnlessDBFeature, TransactionTestCase
from django.test.utils import override_settings
from django.utils import unittest

from . import models
@@ -308,23 +309,43 @@ class EscapingChecks(TestCase):


class BackendTestCase(TestCase):
    def test_cursor_executemany(self):
        #4896: Test cursor.executemany

    def create_squares_with_executemany(self, args):
        cursor = connection.cursor()
        qn = connection.ops.quote_name
        opts = models.Square._meta
        f1, f2 = opts.get_field('root'), opts.get_field('square')
        query = ('INSERT INTO %s (%s, %s) VALUES (%%s, %%s)'
                 % (connection.introspection.table_name_converter(opts.db_table), qn(f1.column), qn(f2.column)))
        cursor.executemany(query, [(i, i**2) for i in range(-5, 6)])
        tbl = connection.introspection.table_name_converter(opts.db_table)
        f1 = connection.ops.quote_name(opts.get_field('root').column)
        f2 = connection.ops.quote_name(opts.get_field('square').column)
        query = 'INSERT INTO %s (%s, %s) VALUES (%%s, %%s)' % (tbl, f1, f2)
        cursor.executemany(query, args)

    def test_cursor_executemany(self):
        #4896: Test cursor.executemany
        args = [(i, i**2) for i in range(-5, 6)]
        self.create_squares_with_executemany(args)
        self.assertEqual(models.Square.objects.count(), 11)
        for i in range(-5, 6):
            square = models.Square.objects.get(root=i)
            self.assertEqual(square.square, i**2)

    def test_cursor_executemany_with_empty_params_list(self):
        #4765: executemany with params=[] does nothing
        cursor.executemany(query, [])
        self.assertEqual(models.Square.objects.count(), 11)
        args = []
        self.create_squares_with_executemany(args)
        self.assertEqual(models.Square.objects.count(), 0)

    def test_cursor_executemany_with_iterator(self):
        #10320: executemany accepts iterators
        args = iter((i, i**2) for i in range(-3, 2))
        self.create_squares_with_executemany(args)
        self.assertEqual(models.Square.objects.count(), 5)

        args = iter((i, i**2) for i in range(3, 7))
        with override_settings(DEBUG=True):
            # same test for DebugCursorWrapper
            self.create_squares_with_executemany(args)
        self.assertEqual(models.Square.objects.count(), 9)


    def test_unicode_fetches(self):
        #6254: fetchone, fetchmany, fetchall return strings as unicode objects