source: main/waeup.uniben/trunk/src/waeup/uniben/scripts.py @ 13195

Last change on this file since 13195 was 13195, checked in by uli, 9 years ago

Don't fall back into pdb when done.

File size: 4.8 KB
Line 
1"""The export() function herein is called by the kofaexport script.
2
3To make this work, you have to pip install psutil in your local virtualenv.
4"""
5import gc
6import grok
7import os
8import tempfile
9import time
10from ZODB import DB, DemoStorage, FileStorage
11from ZODB.blob import BlobStorage
12from zope.component import getUtility
13from waeup.kofa.interfaces import ICSVExporter
14
15
16#: Remember what packages were grokked already.
17grokked = dict()
18
19
20#: Packages that have to be 'grokked' in order to register exporters
21#: and everything else.
22#: The order of these packages is usually relevant.
23TO_BE_GROKKED = ("waeup.kofa", "kofacustom.nigeria", "waeup.uniben")
24
25#: The data root from where we can look out for Data.fs, etc.
26#: We usually expect a `var/` directory in this root.
27INSTANCE_HOME = os.path.dirname(os.path.dirname(os.path.dirname(
28    os.path.dirname(__file__))))
29
30
31def grok_bases():
32    """Grok packages not already grokked.
33    """
34    for pkg_name in TO_BE_GROKKED:
35        if grokked.get(pkg_name, False):
36            continue
37        print("Grokking %s..." % pkg_name)
38        grokked[pkg_name] = True
39        grok.testing.grok(pkg_name)
40        print("Done.")
41
42
43def init_dbs():
44    """Setup databases.
45
46    We return a sequence of `closables`. The closables can be passed
47    to close_dbs() if you're done.
48
49    The first of the elements returned is the main database. open()
50    it for a new connection to the ZODB.
51    """
52    # https://github.com/zopefoundation/ZODB/\
53    #         blob/master/src/ZODB/cross-database-references.txt
54    databases = {}
55    root_dir = INSTANCE_HOME
56    db_dir = os.path.join(root_dir, 'var', 'filestorage')
57    blob_dir = os.path.join(root_dir, 'var', 'blobstorage')
58    db_path = os.path.join(db_dir, 'Data.fs')
59    async_db_path = os.path.join(db_dir, 'Data.async.fs')
60    async_storage = FileStorage.FileStorage(async_db_path, read_only=True)
61    db1 = DB(async_storage, database_name="async", databases=databases)
62    base_storage = FileStorage.FileStorage(db_path, read_only=True)
63    blob_storage = BlobStorage(blob_dir, base_storage)
64    db2 = DB(blob_storage, databases=databases)
65    db2.open()
66    grok_bases()
67    return (db2, db1, base_storage, blob_storage, async_storage)
68
69
70def close_dbs(closables):
71    """Close all elements in `closables`.
72    """
73    for elem in closables:
74        print("Closing %s..." % elem)
75        elem.close()
76        print("Done.")
77
78
79def get_university(conn):
80    return conn.root()["Application"]["uniben"]
81
82
83def export():
84    """Main function.
85    """
86    closables = init_dbs()
87    conn = closables[0].open()
88    uni = get_university(conn)
89    studs = uni['students']
90
91    parts = partition(studs)
92    print(parts)
93
94    #parts = [(0, 0, 100), ]
95    #parts = [(0, 0, 4999), (1, 5000, 9999), (2, 10000, 14999)]
96    #parts = [(0, 0, 14999), ]
97    #parts = [(0, 0, 160000), ]
98
99    workdir = tempfile.mkdtemp()
100
101    t1 = time.time()
102    conn.close()  # every loop will reopen the connection
103    for p_num, start, end in parts:
104        conn = closables[0].open()
105        uni = get_university(conn)
106        studs = uni['students']
107        curr_path = os.path.join(workdir, "myexport%s.csv" % p_num)
108        export_part(studs, p_num, start, end, curr_path)
109        conn.close()
110    print("Result in %s" % workdir)
111    t2 = time.time()
112    print("Elapsed: %s secs" % (t2 - t1))
113
114
115def export_part(container, part_num, start, end, path):
116    import psutil  # late import. Don't want to make it waeup.uniben req.
117    PROC = psutil.Process(os.getpid())
118    gc.collect()
119    mem1 = PROC.get_memory_info().rss
120    print("  Export %s-%s to %s (mem: %s)" % (start, end, path, mem1))
121    tp1 = time.time()
122    exporter = getUtility(ICSVExporter, name="students")
123    exporter.export(get_all_students(container, start, end), path)
124    tp2 = time.time()
125    mem2 = PROC.get_memory_info().rss
126    print("  Done (%s secs, mem: %s)" % (tp2 - tp1, mem2))
127
128
129def get_all_students(container, cnt_from=0, cnt_to=0):
130    cnt = 0
131    for key in container:
132        if cnt < cnt_from:
133            cnt += 1
134            continue
135        if cnt_to and (cnt > cnt_to):
136            break
137        cnt += 1
138        elem = container.get(key)
139        yield elem
140        del elem
141
142
143def partition(container, part_size=10000):
144    """Partition `container` into chunks.
145
146    Get a list of triples (<num>, <index_start>, <index_end>) which
147    represent chunks of elements from `container`.
148
149    The `container` object must support `len()`.
150
151    Split length of `container` and tell what partitions we get, if each
152    partition is size `part_size` or less.
153
154    For instance a container of size 250 and `part_size` 100 would give:
155
156      [(0,   0,  99),
157       (1, 100, 199),
158       (2, 200, 249),
159       ]
160
161    """
162    num = len(container)
163    print("Container elements: %s" % num)
164    return [
165        (idx, start, min(start + part_size - 1, num - 1))
166        for idx, start in enumerate(range(0, num, part_size))]
Note: See TracBrowser for help on using the repository browser.