"""The export() function herein is called by the kofaexport script. To make this work, you have to pip install psutil in your local virtualenv. """ import gc import grok import os import tempfile import time from ZODB import DB, DemoStorage, FileStorage from ZODB.blob import BlobStorage from zope.component import getUtility from waeup.kofa.interfaces import ICSVExporter #: Remember what packages were grokked already. grokked = dict() #: Packages that have to be 'grokked' in order to register exporters #: and everything else. #: The order of these packages is usually relevant. TO_BE_GROKKED = ("waeup.kofa", "kofacustom.nigeria", "waeup.uniben") #: The data root from where we can look out for Data.fs, etc. #: We usually expect a `var/` directory in this root. INSTANCE_HOME = os.path.dirname(os.path.dirname(os.path.dirname( os.path.dirname(__file__)))) def grok_bases(): """Grok packages not already grokked. """ for pkg_name in TO_BE_GROKKED: if grokked.get(pkg_name, False): continue print("Grokking %s..." % pkg_name) grokked[pkg_name] = True grok.testing.grok(pkg_name) print("Done.") def init_dbs(): """Setup databases. We return a sequence of `closables`. The closables can be passed to close_dbs() if you're done. The first of the elements returned is the main database. open() it for a new connection to the ZODB. """ # https://github.com/zopefoundation/ZODB/blob/master/src/ZODB/cross-database-references.txt databases = {} root_dir = INSTANCE_HOME db_dir = os.path.join(root_dir, 'var', 'filestorage') blob_dir = os.path.join(root_dir, 'var', 'blobstorage') db_path = os.path.join(db_dir, 'Data.fs') async_db_path = os.path.join(db_dir, 'Data.async.fs') async_storage = FileStorage.FileStorage(async_db_path, read_only=True) db1 = DB(async_storage, database_name="async", databases=databases) base_storage = FileStorage.FileStorage(db_path, read_only=True) blob_storage = BlobStorage(blob_dir, base_storage) db2 = DB(blob_storage, databases=databases) db2.open() grok_bases() return (db2, db1, base_storage, blob_storage, async_storage) def close_dbs(closables): """Close all elements in `closables`. """ for elem in closables: print("Closing %s..." % elem) elem.close() print("Done.") def get_university(conn): return conn.root()["Application"]["uniben"] def export(): """Main function. """ closables = init_dbs() conn = closables[0].open() uni = get_university(conn) studs = uni['students'] parts= partition(studs) print(parts) #parts = [(0, 0, 100), ] #parts = [(0, 0, 4999), (1, 5000, 9999), (2, 10000, 14999)] #parts = [(0, 0, 14999), ] #parts = [(0, 0, 160000), ] workdir = tempfile.mkdtemp() t1 = time.time() conn.close() # every loop will reopen the connection for p_num, start, end in parts: conn = closables[0].open() uni = get_university(conn) studs = uni['students'] curr_path = os.path.join(workdir, "myexport%s.csv" % p_num) export_part(studs, p_num, start, end, curr_path) conn.close() print("Result in %s" % workdir) t2 = time.time() print("Elapsed: %s secs" % (t2 - t1)) import pdb; pdb.set_trace() def export_part(container, part_num, start, end, path): import psutil # late import. Don't want to make it waeup.uniben req. PROC = psutil.Process(os.getpid()) gc.collect() mem1 = PROC.get_memory_info().rss print(" Export %s-%s to %s (mem: %s)" % (start, end, path, mem1)) tp1 = time.time() exporter = getUtility(ICSVExporter, name="students") exporter.export(get_all_students(container, start, end), path) tp2 = time.time() mem2 = PROC.get_memory_info().rss print(" Done (%s secs, mem: %s)" % (tp2 - tp1, mem2)) def get_all_students(container, cnt_from=0, cnt_to=0): cnt = 0 for key in container: if cnt < cnt_from: cnt += 1 continue if cnt_to and (cnt > cnt_to): break cnt += 1 elem = container.get(key) yield elem del elem def partition(container, part_size=10000): """Partition `container` into chunks. Get a list of triples (, , ) which represent chunks of elements from `container`. The `container` object must support `len()`. Split length of `container` and tell what partitions we get, if each partition is size `part_size` or less. For instance a container of size 250 and `part_size` 100 would give: [(0, 0, 99), (1, 100, 199), (2, 200, 249), ] """ num = len(container) print("Container elements: %s" % num) return [(idx, start, min(start + part_size - 1, num - 1)) for idx, start in enumerate(range(0, num, part_size))]