"""The export() function herein is called by the kofaexport script. To make this work, you have to pip install psutil in your local virtualenv. """ import argparse import gc import grok import os import tempfile import time from ZODB import DB, DemoStorage, FileStorage from ZODB.blob import BlobStorage from zope.component import getUtility from waeup.kofa.interfaces import ICSVExporter #: Remember what packages were grokked already. grokked = dict() #: Packages that have to be 'grokked' in order to register exporters #: and everything else. #: The order of these packages is usually relevant. TO_BE_GROKKED = ("waeup.kofa", "kofacustom.nigeria", "waeup.uniben") #: The data root from where we can look out for Data.fs, etc. This is #: normally the `var/` dir of an instance. VAR_DIR = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname( os.path.dirname(__file__)))), 'var') #: The name of the `University` instance in which we look for export #: items. APP_NAME = "uniben" #: The name under which the exporter to use is registered. EXPORTER_NAME = "students" def handle_options(): """Handle commandline options. """ parser = argparse.ArgumentParser( description="Export WAeUP kofa data") args = parser.parse_args() return args def grok_bases(): """Grok packages not already grokked. """ for pkg_name in TO_BE_GROKKED: if grokked.get(pkg_name, False): continue print("Grokking %s..." % pkg_name) grokked[pkg_name] = True grok.testing.grok(pkg_name) print("Done.") def init_dbs(): """Setup databases. We return a sequence of `closables`. The closables can be passed to close_dbs() if you're done. The first of the elements returned is the main database. open() it for a new connection to the ZODB. """ # https://github.com/zopefoundation/ZODB/\ # blob/master/src/ZODB/cross-database-references.txt databases = {} db_dir = os.path.join(VAR_DIR, 'filestorage') blob_dir = os.path.join(VAR_DIR, 'blobstorage') db_path = os.path.join(db_dir, 'Data.fs') async_db_path = os.path.join(db_dir, 'Data.async.fs') async_storage = FileStorage.FileStorage(async_db_path, read_only=True) db1 = DB(async_storage, database_name="async", databases=databases) base_storage = FileStorage.FileStorage(db_path, read_only=True) blob_storage = BlobStorage(blob_dir, base_storage) db2 = DB(blob_storage, databases=databases) db2.open() grok_bases() return (db2, db1, base_storage, blob_storage, async_storage) def close_dbs(closables): """Close all elements in `closables`. """ for elem in closables: print("Closing %s..." % elem) elem.close() print("Done.") def get_university(conn): return conn.root()["Application"][APP_NAME] def get_all_students(container, cnt_from=0, cnt_to=0): cnt = 0 for key in container: if cnt < cnt_from: cnt += 1 continue if cnt_to and (cnt > cnt_to): break cnt += 1 elem = container.get(key) yield elem del elem def partition(container, part_size=10000): """Partition `container` into chunks. Get a list of triples (, , ) which represent chunks of elements from `container`. The `container` object must support `len()`. Split length of `container` and tell what partitions we get, if each partition is size `part_size` or less. For instance a container of size 250 and `part_size` 100 would give: [(0, 0, 99), (1, 100, 199), (2, 200, 249), ] """ num = len(container) print("Container elements: %s" % num) return [ (idx, start, min(start + part_size - 1, num - 1)) for idx, start in enumerate(range(0, num, part_size))] def get_mem_info(): """Get current memory info. This works only, if `psutil` is installed locally (in virtualenv). Otherwise we return `None`. """ try: # late import. We do not want to make it a waeup.uniben dependency. import psutil except ImportError: return proc = psutil.Process(os.getpid()) return proc.get_memory_info().rss def export_part(container, part_num, start, end, path): """Export part number `part_num` from `container` to `path`. `path` is the filesystem path we want to export to. `start` and `end` are the index numbers of the elements we want to export. Indexes are zero-based (starting with zero, not one). """ gc.collect() mem1 = get_mem_info() print(" Export %s-%s to %s (mem: %s)" % (start, end, path, mem1)) tp1 = time.time() exporter = getUtility(ICSVExporter, name=EXPORTER_NAME) exporter.export(get_all_students(container, start, end), path) tp2 = time.time() mem2 = get_mem_info() print(" Done (%s secs, mem: %s)" % (tp2 - tp1, mem2)) def export(): """Main function. """ options = handle_options() closables = init_dbs() conn = closables[0].open() uni = get_university(conn) studs = uni['students'] parts = partition(studs) parts = [(0, 0, 100), ] #parts = [(0, 0, 4999), (1, 5000, 9999), (2, 10000, 14999)] #parts = [(0, 0, 14999), ] #parts = [(0, 0, 160000), ] workdir = tempfile.mkdtemp() t1 = time.time() conn.close() # every loop will reopen the connection for p_num, start, end in parts: conn = closables[0].open() uni = get_university(conn) studs = uni['students'] curr_path = os.path.join(workdir, "myexport%s.csv" % p_num) export_part(studs, p_num, start, end, curr_path) conn.close() print("Result in %s" % workdir) t2 = time.time() print("Elapsed: %s secs" % (t2 - t1))