=== added file 'lib/lp/services/database/bulk.py'
--- lib/lp/services/database/bulk.py	1970-01-01 00:00:00 +0000
+++ lib/lp/services/database/bulk.py	2010-04-30 15:26:06 +0000
@@ -0,0 +1,66 @@
+# Copyright 2010 Canonical Ltd.  This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Optimized bulk operations against the database."""
+
+__metaclass__ = type
+__all__ = [
+    'reload',
+    ]
+
+
+from collections import defaultdict
+
+from zope.security.proxy import removeSecurityProxy
+
+from storm.base import Storm
+from storm.expr import In
+from storm.info import get_cls_info
+from storm.store import Store
+
+
+def collate(things, key):
+    """Collate the given objects according to a key function.
+
+    Generates (common-key-value, list-of-things) tuples, like groupby,
+    except that the given objects do not need to be sorted.
+    """
+    collection = defaultdict(list)
+    for thing in things:
+        collection[key(thing)].append(thing)
+    return collection.iteritems()
+
+
+def get_type(thing):
+    """Return the type of the given object.
+
+    If the given object is wrapped by a security proxy, the type
+    returned is that of the wrapped object.
+    """
+    return type(removeSecurityProxy(thing))
+
+
+def gen_reload_queries(objects):
+    """Prepare queries to reload the given objects."""
+    for object_type, objects in collate(objects, get_type):
+        if not issubclass(object_type, Storm):
+            raise AssertionError(
+                "Cannot load objects of type %s: %r" % (
+                    object_type.__name__, objects))
+        primary_key = get_cls_info(object_type).primary_key
+        if len(primary_key) != 1:
+            raise AssertionError(
+                "Compound primary keys are not supported: %s." %
+                object_type.__name__)
+        primary_key_column = primary_key[0]
+        primary_key_column_getter = primary_key_column.__get__
+        for store, objects in collate(objects, Store.of):
+            primary_keys = map(primary_key_column_getter, objects)
+            condition = In(primary_key_column, primary_keys)
+            yield store.find(object_type, condition)
+
+
+def reload(objects):
+    """Reload a large number of objects efficiently."""
+    for query in gen_reload_queries(objects):
+        list(query)

=== added file 'lib/lp/services/database/tests/test_bulk.py'
--- lib/lp/services/database/tests/test_bulk.py	1970-01-01 00:00:00 +0000
+++ lib/lp/services/database/tests/test_bulk.py	2010-04-30 15:26:06 +0000
@@ -0,0 +1,152 @@
+# Copyright 2010 Canonical Ltd.  This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Test the bulk database functions."""
+
+__metaclass__ = type
+
+import unittest
+
+import transaction
+
+import zope.security.checker
+import zope.security.proxy
+
+from storm.info import get_obj_info
+
+from canonical.launchpad.interfaces.lpstorm import (
+    IMasterStore, ISlaveStore, IStore)
+from canonical.testing import LaunchpadZopelessLayer
+
+from lp.bugs.model.bug import BugAffectsPerson
+from lp.services.database import bulk
+from lp.testing import TestCase, TestCaseWithFactory
+
+
+object_is_key = lambda thing: thing
+
+
+class TestBasicFunctions(TestCase):
+
+    def test_collate_empty_list(self):
+        self.failUnlessEqual([], list(bulk.collate([], object_is_key)))
+
+    def test_collate_when_object_is_key(self):
+        self.failUnlessEqual(
+            [(1, [1])],
+            list(bulk.collate([1], object_is_key)))
+        self.failUnlessEqual(
+            [(1, [1]), (2, [2, 2])],
+            sorted(bulk.collate([1, 2, 2], object_is_key)))
+
+    def test_collate_with_key_function(self):
+        self.failUnlessEqual(
+            [(4, ['fred', 'joss']), (6, ['barney'])],
+            sorted(bulk.collate(['fred', 'barney', 'joss'], len)))
+
+    def test_get_type(self):
+        self.failUnlessEqual(object, bulk.get_type(object()))
+
+    def test_get_type_with_proxied_object(self):
+        proxied_object = zope.security.proxy.Proxy(
+            'fred', zope.security.checker.Checker({}))
+        self.failUnlessEqual(str, bulk.get_type(proxied_object))
+
+
+class TestLoaders(TestCaseWithFactory):
+
+    layer = LaunchpadZopelessLayer
+
+    def test_gen_reload_queries_with_empty_list(self):
+        self.failUnlessEqual([], list(bulk.gen_reload_queries([])))
+
+    def test_gen_reload_queries_with_single_object(self):
+        # gen_reload_queries() should generate a single query for a
+        # single object.
+        db_objects = [self.factory.makeSourcePackageName()]
+        db_queries = list(bulk.gen_reload_queries(db_objects))
+        self.failUnlessEqual(1, len(db_queries))
+        db_query = db_queries[0]
+        self.failUnlessEqual(db_objects, list(db_query))
+
+    def test_gen_reload_queries_with_multiple_similar_objects(self):
+        # gen_reload_queries() should generate a single query to load
+        # multiple objects of the same type.
+        db_objects = set(
+            self.factory.makeSourcePackageName() for i in range(5))
+        db_queries = list(bulk.gen_reload_queries(db_objects))
+        self.failUnlessEqual(1, len(db_queries))
+        db_query = db_queries[0]
+        self.failUnlessEqual(db_objects, set(db_query))
+
+    def test_gen_reload_queries_with_mixed_objects(self):
+        # gen_reload_queries() should return one query for each
+        # distinct object type in the given objects.
+        db_objects = set(
+            self.factory.makeSourcePackageName() for i in range(5))
+        db_objects.update(
+            self.factory.makeComponent() for i in range(5))
+        db_queries = list(bulk.gen_reload_queries(db_objects))
+        self.failUnlessEqual(2, len(db_queries))
+        db_objects_loaded = set()
+        for db_query in db_queries:
+            objects = set(db_query)
+            # None of these objects should have been loaded before.
+            self.failUnlessEqual(
+                set(), objects.intersection(db_objects_loaded))
+            db_objects_loaded.update(objects)
+        self.failUnlessEqual(db_objects, db_objects_loaded)
+
+    def test_gen_reload_queries_with_mixed_stores(self):
+        # gen_reload_queries() returns one query for each distinct
+        # store even for the same object type.
+        db_object = self.factory.makeComponent()
+        db_object_type = bulk.get_type(db_object)
+        # Commit so the database object is available in both master
+        # and slave stores.
+        transaction.commit()
+        db_objects = set(
+            (IMasterStore(db_object).get(db_object_type, db_object.id),
+             ISlaveStore(db_object).get(db_object_type, db_object.id)))
+        db_queries = list(bulk.gen_reload_queries(db_objects))
+        self.failUnlessEqual(2, len(db_queries))
+        db_objects_loaded = set()
+        for db_query in db_queries:
+            objects = set(db_query)
+            # None of these objects should have been loaded before.
+            self.failUnlessEqual(
+                set(), objects.intersection(db_objects_loaded))
+            db_objects_loaded.update(objects)
+        self.failUnlessEqual(db_objects, db_objects_loaded)
+
+    def test_gen_reload_queries_with_non_Storm_objects(self):
+        # gen_reload_queries() does not like non-Storm objects.
+        self.assertRaisesWithContent(
+            AssertionError,
+            "Cannot load objects of type str: ['fred']",
+            list, bulk.gen_reload_queries(['fred']))
+
+    def test_gen_reload_queries_with_compound_primary_keys(self):
+        # gen_reload_queries() does not like compound primary keys.
+        db_queries = bulk.gen_reload_queries([BugAffectsPerson()])
+        self.assertRaisesWithContent(
+            AssertionError,
+            'Compound primary keys are not supported: BugAffectsPerson.',
+            list, db_queries)
+
+    def test_load(self):
+        # load() loads the given objects using queries generated by
+        # gen_reload_queries().
+        db_object = self.factory.makeComponent()
+        db_object_naked = zope.security.proxy.removeSecurityProxy(db_object)
+        db_object_info = get_obj_info(db_object_naked)
+        IStore(db_object).flush()
+        self.failUnlessEqual(None, db_object_info.get('invalidated'))
+        IStore(db_object).invalidate(db_object)
+        self.failUnlessEqual(True, db_object_info.get('invalidated'))
+        bulk.reload([db_object])
+        self.failUnlessEqual(None, db_object_info.get('invalidated'))
+
+
+def test_suite():
+    return unittest.TestLoader().loadTestsFromName(__name__)

