source: main/waeup.uniben/trunk/src/waeup/uniben/scripts.py @ 13202

Last change on this file since 13202 was 13202, checked in by uli, 9 years ago

Reorder functions.

File size: 4.8 KB
RevLine 
[13190]1"""The export() function herein is called by the kofaexport script.
2
3To make this work, you have to pip install psutil in your local virtualenv.
4"""
5import gc
6import grok
7import os
8import tempfile
9import time
10from ZODB import DB, DemoStorage, FileStorage
11from ZODB.blob import BlobStorage
12from zope.component import getUtility
13from waeup.kofa.interfaces import ICSVExporter
14
15
16#: Remember what packages were grokked already.
17grokked = dict()
18
19
[13191]20#: Packages that have to be 'grokked' in order to register exporters
21#: and everything else.
22#: The order of these packages is usually relevant.
23TO_BE_GROKKED = ("waeup.kofa", "kofacustom.nigeria", "waeup.uniben")
[13190]24
[13191]25#: The data root from where we can look out for Data.fs, etc.
26#: We usually expect a `var/` directory in this root.
27INSTANCE_HOME = os.path.dirname(os.path.dirname(os.path.dirname(
28    os.path.dirname(__file__))))
29
30
[13190]31def grok_bases():
32    """Grok packages not already grokked.
33    """
[13191]34    for pkg_name in TO_BE_GROKKED:
[13190]35        if grokked.get(pkg_name, False):
36            continue
37        print("Grokking %s..." % pkg_name)
38        grokked[pkg_name] = True
39        grok.testing.grok(pkg_name)
40        print("Done.")
41
42
43def init_dbs():
44    """Setup databases.
45
46    We return a sequence of `closables`. The closables can be passed
47    to close_dbs() if you're done.
48
49    The first of the elements returned is the main database. open()
50    it for a new connection to the ZODB.
51    """
[13194]52    # https://github.com/zopefoundation/ZODB/\
53    #         blob/master/src/ZODB/cross-database-references.txt
[13190]54    databases = {}
[13191]55    root_dir = INSTANCE_HOME
[13190]56    db_dir = os.path.join(root_dir, 'var', 'filestorage')
57    blob_dir = os.path.join(root_dir, 'var', 'blobstorage')
58    db_path = os.path.join(db_dir, 'Data.fs')
59    async_db_path = os.path.join(db_dir, 'Data.async.fs')
60    async_storage = FileStorage.FileStorage(async_db_path, read_only=True)
61    db1 = DB(async_storage, database_name="async", databases=databases)
62    base_storage = FileStorage.FileStorage(db_path, read_only=True)
63    blob_storage = BlobStorage(blob_dir, base_storage)
64    db2 = DB(blob_storage, databases=databases)
65    db2.open()
66    grok_bases()
67    return (db2, db1, base_storage, blob_storage, async_storage)
68
69
70def close_dbs(closables):
71    """Close all elements in `closables`.
72    """
73    for elem in closables:
74        print("Closing %s..." % elem)
75        elem.close()
76        print("Done.")
77
78
79def get_university(conn):
80    return conn.root()["Application"]["uniben"]
81
82
83def get_all_students(container, cnt_from=0, cnt_to=0):
84    cnt = 0
85    for key in container:
86        if cnt < cnt_from:
87            cnt += 1
88            continue
89        if cnt_to and (cnt > cnt_to):
90            break
91        cnt += 1
92        elem = container.get(key)
93        yield elem
94        del elem
95
96
97def partition(container, part_size=10000):
[13193]98    """Partition `container` into chunks.
[13190]99
[13193]100    Get a list of triples (<num>, <index_start>, <index_end>) which
101    represent chunks of elements from `container`.
102
103    The `container` object must support `len()`.
104
105    Split length of `container` and tell what partitions we get, if each
106    partition is size `part_size` or less.
107
108    For instance a container of size 250 and `part_size` 100 would give:
109
110      [(0,   0,  99),
111       (1, 100, 199),
112       (2, 200, 249),
113       ]
114
[13190]115    """
116    num = len(container)
[13193]117    print("Container elements: %s" % num)
[13194]118    return [
119        (idx, start, min(start + part_size - 1, num - 1))
120        for idx, start in enumerate(range(0, num, part_size))]
[13202]121
122
123def export_part(container, part_num, start, end, path):
124    import psutil  # late import. Don't want to make it waeup.uniben req.
125    PROC = psutil.Process(os.getpid())
126    gc.collect()
127    mem1 = PROC.get_memory_info().rss
128    print("  Export %s-%s to %s (mem: %s)" % (start, end, path, mem1))
129    tp1 = time.time()
130    exporter = getUtility(ICSVExporter, name="students")
131    exporter.export(get_all_students(container, start, end), path)
132    tp2 = time.time()
133    mem2 = PROC.get_memory_info().rss
134    print("  Done (%s secs, mem: %s)" % (tp2 - tp1, mem2))
135
136
137def export():
138    """Main function.
139    """
140    closables = init_dbs()
141    conn = closables[0].open()
142    uni = get_university(conn)
143    studs = uni['students']
144
145    parts = partition(studs)
146    print(parts)
147
148    #parts = [(0, 0, 100), ]
149    #parts = [(0, 0, 4999), (1, 5000, 9999), (2, 10000, 14999)]
150    #parts = [(0, 0, 14999), ]
151    #parts = [(0, 0, 160000), ]
152
153    workdir = tempfile.mkdtemp()
154
155    t1 = time.time()
156    conn.close()  # every loop will reopen the connection
157    for p_num, start, end in parts:
158        conn = closables[0].open()
159        uni = get_university(conn)
160        studs = uni['students']
161        curr_path = os.path.join(workdir, "myexport%s.csv" % p_num)
162        export_part(studs, p_num, start, end, curr_path)
163        conn.close()
164    print("Result in %s" % workdir)
165    t2 = time.time()
166    print("Elapsed: %s secs" % (t2 - t1))
Note: See TracBrowser for help on using the repository browser.