[13190] | 1 | """The export() function herein is called by the kofaexport script. |
---|
| 2 | |
---|
| 3 | To make this work, you have to pip install psutil in your local virtualenv. |
---|
| 4 | """ |
---|
[13210] | 5 | import argparse |
---|
[13190] | 6 | import gc |
---|
| 7 | import grok |
---|
| 8 | import os |
---|
| 9 | import tempfile |
---|
| 10 | import time |
---|
| 11 | from ZODB import DB, DemoStorage, FileStorage |
---|
| 12 | from ZODB.blob import BlobStorage |
---|
| 13 | from zope.component import getUtility |
---|
| 14 | from waeup.kofa.interfaces import ICSVExporter |
---|
| 15 | |
---|
| 16 | |
---|
| 17 | #: Remember what packages were grokked already. |
---|
| 18 | grokked = dict() |
---|
| 19 | |
---|
| 20 | |
---|
[13191] | 21 | #: Packages that have to be 'grokked' in order to register exporters |
---|
| 22 | #: and everything else. |
---|
| 23 | #: The order of these packages is usually relevant. |
---|
| 24 | TO_BE_GROKKED = ("waeup.kofa", "kofacustom.nigeria", "waeup.uniben") |
---|
[13190] | 25 | |
---|
[13208] | 26 | #: The data root from where we can look out for Data.fs, etc. This is |
---|
| 27 | #: normally the `var/` dir of an instance. |
---|
| 28 | VAR_DIR = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname( |
---|
| 29 | os.path.dirname(__file__)))), 'var') |
---|
[13191] | 30 | |
---|
[13203] | 31 | #: The name of the `University` instance in which we look for export |
---|
| 32 | #: items. |
---|
| 33 | APP_NAME = "uniben" |
---|
[13191] | 34 | |
---|
[13204] | 35 | #: The name under which the exporter to use is registered. |
---|
| 36 | EXPORTER_NAME = "students" |
---|
[13203] | 37 | |
---|
[13209] | 38 | |
---|
[13210] | 39 | def handle_options(): |
---|
| 40 | """Handle commandline options. |
---|
| 41 | """ |
---|
| 42 | parser = argparse.ArgumentParser( |
---|
| 43 | description="Export WAeUP kofa data") |
---|
| 44 | args = parser.parse_args() |
---|
| 45 | return args |
---|
| 46 | |
---|
| 47 | |
---|
[13190] | 48 | def grok_bases(): |
---|
| 49 | """Grok packages not already grokked. |
---|
| 50 | """ |
---|
[13191] | 51 | for pkg_name in TO_BE_GROKKED: |
---|
[13190] | 52 | if grokked.get(pkg_name, False): |
---|
| 53 | continue |
---|
| 54 | print("Grokking %s..." % pkg_name) |
---|
| 55 | grokked[pkg_name] = True |
---|
| 56 | grok.testing.grok(pkg_name) |
---|
| 57 | print("Done.") |
---|
| 58 | |
---|
| 59 | |
---|
| 60 | def init_dbs(): |
---|
| 61 | """Setup databases. |
---|
| 62 | |
---|
| 63 | We return a sequence of `closables`. The closables can be passed |
---|
| 64 | to close_dbs() if you're done. |
---|
| 65 | |
---|
| 66 | The first of the elements returned is the main database. open() |
---|
| 67 | it for a new connection to the ZODB. |
---|
| 68 | """ |
---|
[13194] | 69 | # https://github.com/zopefoundation/ZODB/\ |
---|
| 70 | # blob/master/src/ZODB/cross-database-references.txt |
---|
[13190] | 71 | databases = {} |
---|
[13208] | 72 | db_dir = os.path.join(VAR_DIR, 'filestorage') |
---|
| 73 | blob_dir = os.path.join(VAR_DIR, 'blobstorage') |
---|
[13190] | 74 | db_path = os.path.join(db_dir, 'Data.fs') |
---|
| 75 | async_db_path = os.path.join(db_dir, 'Data.async.fs') |
---|
| 76 | async_storage = FileStorage.FileStorage(async_db_path, read_only=True) |
---|
| 77 | db1 = DB(async_storage, database_name="async", databases=databases) |
---|
| 78 | base_storage = FileStorage.FileStorage(db_path, read_only=True) |
---|
| 79 | blob_storage = BlobStorage(blob_dir, base_storage) |
---|
| 80 | db2 = DB(blob_storage, databases=databases) |
---|
| 81 | db2.open() |
---|
| 82 | grok_bases() |
---|
| 83 | return (db2, db1, base_storage, blob_storage, async_storage) |
---|
| 84 | |
---|
| 85 | |
---|
| 86 | def close_dbs(closables): |
---|
| 87 | """Close all elements in `closables`. |
---|
| 88 | """ |
---|
| 89 | for elem in closables: |
---|
| 90 | print("Closing %s..." % elem) |
---|
| 91 | elem.close() |
---|
| 92 | print("Done.") |
---|
| 93 | |
---|
| 94 | |
---|
| 95 | def get_university(conn): |
---|
[13203] | 96 | return conn.root()["Application"][APP_NAME] |
---|
[13190] | 97 | |
---|
| 98 | |
---|
| 99 | def get_all_students(container, cnt_from=0, cnt_to=0): |
---|
| 100 | cnt = 0 |
---|
| 101 | for key in container: |
---|
| 102 | if cnt < cnt_from: |
---|
| 103 | cnt += 1 |
---|
| 104 | continue |
---|
| 105 | if cnt_to and (cnt > cnt_to): |
---|
| 106 | break |
---|
| 107 | cnt += 1 |
---|
| 108 | elem = container.get(key) |
---|
| 109 | yield elem |
---|
| 110 | del elem |
---|
| 111 | |
---|
| 112 | |
---|
| 113 | def partition(container, part_size=10000): |
---|
[13193] | 114 | """Partition `container` into chunks. |
---|
[13190] | 115 | |
---|
[13193] | 116 | Get a list of triples (<num>, <index_start>, <index_end>) which |
---|
| 117 | represent chunks of elements from `container`. |
---|
| 118 | |
---|
| 119 | The `container` object must support `len()`. |
---|
| 120 | |
---|
| 121 | Split length of `container` and tell what partitions we get, if each |
---|
| 122 | partition is size `part_size` or less. |
---|
| 123 | |
---|
| 124 | For instance a container of size 250 and `part_size` 100 would give: |
---|
| 125 | |
---|
| 126 | [(0, 0, 99), |
---|
| 127 | (1, 100, 199), |
---|
| 128 | (2, 200, 249), |
---|
| 129 | ] |
---|
| 130 | |
---|
[13190] | 131 | """ |
---|
| 132 | num = len(container) |
---|
[13193] | 133 | print("Container elements: %s" % num) |
---|
[13194] | 134 | return [ |
---|
| 135 | (idx, start, min(start + part_size - 1, num - 1)) |
---|
| 136 | for idx, start in enumerate(range(0, num, part_size))] |
---|
[13202] | 137 | |
---|
| 138 | |
---|
[13205] | 139 | def get_mem_info(): |
---|
| 140 | """Get current memory info. |
---|
| 141 | |
---|
| 142 | This works only, if `psutil` is installed locally (in virtualenv). |
---|
| 143 | Otherwise we return `None`. |
---|
| 144 | """ |
---|
| 145 | try: |
---|
| 146 | # late import. We do not want to make it a waeup.uniben dependency. |
---|
| 147 | import psutil |
---|
| 148 | except ImportError: |
---|
| 149 | return |
---|
| 150 | proc = psutil.Process(os.getpid()) |
---|
| 151 | return proc.get_memory_info().rss |
---|
| 152 | |
---|
| 153 | |
---|
[13202] | 154 | def export_part(container, part_num, start, end, path): |
---|
[13207] | 155 | """Export part number `part_num` from `container` to `path`. |
---|
| 156 | |
---|
| 157 | `path` is the filesystem path we want to export to. `start` and |
---|
| 158 | `end` are the index numbers of the elements we want to |
---|
| 159 | export. Indexes are zero-based (starting with zero, not one). |
---|
| 160 | """ |
---|
[13202] | 161 | gc.collect() |
---|
[13205] | 162 | mem1 = get_mem_info() |
---|
[13202] | 163 | print(" Export %s-%s to %s (mem: %s)" % (start, end, path, mem1)) |
---|
| 164 | tp1 = time.time() |
---|
[13204] | 165 | exporter = getUtility(ICSVExporter, name=EXPORTER_NAME) |
---|
[13202] | 166 | exporter.export(get_all_students(container, start, end), path) |
---|
| 167 | tp2 = time.time() |
---|
[13205] | 168 | mem2 = get_mem_info() |
---|
[13202] | 169 | print(" Done (%s secs, mem: %s)" % (tp2 - tp1, mem2)) |
---|
| 170 | |
---|
| 171 | |
---|
| 172 | def export(): |
---|
| 173 | """Main function. |
---|
| 174 | """ |
---|
[13210] | 175 | options = handle_options() |
---|
[13202] | 176 | closables = init_dbs() |
---|
| 177 | conn = closables[0].open() |
---|
| 178 | uni = get_university(conn) |
---|
| 179 | studs = uni['students'] |
---|
| 180 | |
---|
| 181 | parts = partition(studs) |
---|
| 182 | |
---|
[13207] | 183 | parts = [(0, 0, 100), ] |
---|
[13202] | 184 | #parts = [(0, 0, 4999), (1, 5000, 9999), (2, 10000, 14999)] |
---|
| 185 | #parts = [(0, 0, 14999), ] |
---|
| 186 | #parts = [(0, 0, 160000), ] |
---|
| 187 | |
---|
| 188 | workdir = tempfile.mkdtemp() |
---|
| 189 | |
---|
| 190 | t1 = time.time() |
---|
| 191 | conn.close() # every loop will reopen the connection |
---|
| 192 | for p_num, start, end in parts: |
---|
| 193 | conn = closables[0].open() |
---|
| 194 | uni = get_university(conn) |
---|
| 195 | studs = uni['students'] |
---|
| 196 | curr_path = os.path.join(workdir, "myexport%s.csv" % p_num) |
---|
| 197 | export_part(studs, p_num, start, end, curr_path) |
---|
| 198 | conn.close() |
---|
| 199 | print("Result in %s" % workdir) |
---|
| 200 | t2 = time.time() |
---|
| 201 | print("Elapsed: %s secs" % (t2 - t1)) |
---|