source: main/waeup.uniben/trunk/src/waeup/uniben/scripts.py @ 13203

Last change on this file since 13203 was 13203, checked in by uli, 10 years ago

Make app name a const (parameter).

File size: 4.9 KB
RevLine 
[13190]1"""The export() function herein is called by the kofaexport script.
2
3To make this work, you have to pip install psutil in your local virtualenv.
4"""
5import gc
6import grok
7import os
8import tempfile
9import time
10from ZODB import DB, DemoStorage, FileStorage
11from ZODB.blob import BlobStorage
12from zope.component import getUtility
13from waeup.kofa.interfaces import ICSVExporter
14
15
16#: Remember what packages were grokked already.
17grokked = dict()
18
19
[13191]20#: Packages that have to be 'grokked' in order to register exporters
21#: and everything else.
22#: The order of these packages is usually relevant.
23TO_BE_GROKKED = ("waeup.kofa", "kofacustom.nigeria", "waeup.uniben")
[13190]24
[13191]25#: The data root from where we can look out for Data.fs, etc.
26#: We usually expect a `var/` directory in this root.
27INSTANCE_HOME = os.path.dirname(os.path.dirname(os.path.dirname(
28    os.path.dirname(__file__))))
29
[13203]30#: The name of the `University` instance in which we look for export
31#: items.
32APP_NAME = "uniben"
[13191]33
[13203]34
[13190]35def grok_bases():
36    """Grok packages not already grokked.
37    """
[13191]38    for pkg_name in TO_BE_GROKKED:
[13190]39        if grokked.get(pkg_name, False):
40            continue
41        print("Grokking %s..." % pkg_name)
42        grokked[pkg_name] = True
43        grok.testing.grok(pkg_name)
44        print("Done.")
45
46
47def init_dbs():
48    """Setup databases.
49
50    We return a sequence of `closables`. The closables can be passed
51    to close_dbs() if you're done.
52
53    The first of the elements returned is the main database. open()
54    it for a new connection to the ZODB.
55    """
[13194]56    # https://github.com/zopefoundation/ZODB/\
57    #         blob/master/src/ZODB/cross-database-references.txt
[13190]58    databases = {}
[13191]59    root_dir = INSTANCE_HOME
[13190]60    db_dir = os.path.join(root_dir, 'var', 'filestorage')
61    blob_dir = os.path.join(root_dir, 'var', 'blobstorage')
62    db_path = os.path.join(db_dir, 'Data.fs')
63    async_db_path = os.path.join(db_dir, 'Data.async.fs')
64    async_storage = FileStorage.FileStorage(async_db_path, read_only=True)
65    db1 = DB(async_storage, database_name="async", databases=databases)
66    base_storage = FileStorage.FileStorage(db_path, read_only=True)
67    blob_storage = BlobStorage(blob_dir, base_storage)
68    db2 = DB(blob_storage, databases=databases)
69    db2.open()
70    grok_bases()
71    return (db2, db1, base_storage, blob_storage, async_storage)
72
73
74def close_dbs(closables):
75    """Close all elements in `closables`.
76    """
77    for elem in closables:
78        print("Closing %s..." % elem)
79        elem.close()
80        print("Done.")
81
82
83def get_university(conn):
[13203]84    return conn.root()["Application"][APP_NAME]
[13190]85
86
87def get_all_students(container, cnt_from=0, cnt_to=0):
88    cnt = 0
89    for key in container:
90        if cnt < cnt_from:
91            cnt += 1
92            continue
93        if cnt_to and (cnt > cnt_to):
94            break
95        cnt += 1
96        elem = container.get(key)
97        yield elem
98        del elem
99
100
101def partition(container, part_size=10000):
[13193]102    """Partition `container` into chunks.
[13190]103
[13193]104    Get a list of triples (<num>, <index_start>, <index_end>) which
105    represent chunks of elements from `container`.
106
107    The `container` object must support `len()`.
108
109    Split length of `container` and tell what partitions we get, if each
110    partition is size `part_size` or less.
111
112    For instance a container of size 250 and `part_size` 100 would give:
113
114      [(0,   0,  99),
115       (1, 100, 199),
116       (2, 200, 249),
117       ]
118
[13190]119    """
120    num = len(container)
[13193]121    print("Container elements: %s" % num)
[13194]122    return [
123        (idx, start, min(start + part_size - 1, num - 1))
124        for idx, start in enumerate(range(0, num, part_size))]
[13202]125
126
127def export_part(container, part_num, start, end, path):
128    import psutil  # late import. Don't want to make it waeup.uniben req.
129    PROC = psutil.Process(os.getpid())
130    gc.collect()
131    mem1 = PROC.get_memory_info().rss
132    print("  Export %s-%s to %s (mem: %s)" % (start, end, path, mem1))
133    tp1 = time.time()
134    exporter = getUtility(ICSVExporter, name="students")
135    exporter.export(get_all_students(container, start, end), path)
136    tp2 = time.time()
137    mem2 = PROC.get_memory_info().rss
138    print("  Done (%s secs, mem: %s)" % (tp2 - tp1, mem2))
139
140
141def export():
142    """Main function.
143    """
144    closables = init_dbs()
145    conn = closables[0].open()
146    uni = get_university(conn)
147    studs = uni['students']
148
149    parts = partition(studs)
150    print(parts)
151
152    #parts = [(0, 0, 100), ]
153    #parts = [(0, 0, 4999), (1, 5000, 9999), (2, 10000, 14999)]
154    #parts = [(0, 0, 14999), ]
155    #parts = [(0, 0, 160000), ]
156
157    workdir = tempfile.mkdtemp()
158
159    t1 = time.time()
160    conn.close()  # every loop will reopen the connection
161    for p_num, start, end in parts:
162        conn = closables[0].open()
163        uni = get_university(conn)
164        studs = uni['students']
165        curr_path = os.path.join(workdir, "myexport%s.csv" % p_num)
166        export_part(studs, p_num, start, end, curr_path)
167        conn.close()
168    print("Result in %s" % workdir)
169    t2 = time.time()
170    print("Elapsed: %s secs" % (t2 - t1))
Note: See TracBrowser for help on using the repository browser.