source: main/waeup.uniben/trunk/src/waeup/uniben/scripts.py @ 13209

Last change on this file since 13209 was 13209, checked in by uli, 9 years ago

pep8.

File size: 5.5 KB
Line 
1"""The export() function herein is called by the kofaexport script.
2
3To make this work, you have to pip install psutil in your local virtualenv.
4"""
5import gc
6import grok
7import os
8import tempfile
9import time
10from ZODB import DB, DemoStorage, FileStorage
11from ZODB.blob import BlobStorage
12from zope.component import getUtility
13from waeup.kofa.interfaces import ICSVExporter
14
15
16#: Remember what packages were grokked already.
17grokked = dict()
18
19
20#: Packages that have to be 'grokked' in order to register exporters
21#: and everything else.
22#: The order of these packages is usually relevant.
23TO_BE_GROKKED = ("waeup.kofa", "kofacustom.nigeria", "waeup.uniben")
24
25#: The data root from where we can look out for Data.fs, etc. This is
26#: normally the `var/` dir of an instance.
27VAR_DIR = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(
28    os.path.dirname(__file__)))), 'var')
29
30#: The name of the `University` instance in which we look for export
31#: items.
32APP_NAME = "uniben"
33
34#: The name under which the exporter to use is registered.
35EXPORTER_NAME = "students"
36
37
38def grok_bases():
39    """Grok packages not already grokked.
40    """
41    for pkg_name in TO_BE_GROKKED:
42        if grokked.get(pkg_name, False):
43            continue
44        print("Grokking %s..." % pkg_name)
45        grokked[pkg_name] = True
46        grok.testing.grok(pkg_name)
47        print("Done.")
48
49
50def init_dbs():
51    """Setup databases.
52
53    We return a sequence of `closables`. The closables can be passed
54    to close_dbs() if you're done.
55
56    The first of the elements returned is the main database. open()
57    it for a new connection to the ZODB.
58    """
59    # https://github.com/zopefoundation/ZODB/\
60    #         blob/master/src/ZODB/cross-database-references.txt
61    databases = {}
62    db_dir = os.path.join(VAR_DIR, 'filestorage')
63    blob_dir = os.path.join(VAR_DIR, 'blobstorage')
64    db_path = os.path.join(db_dir, 'Data.fs')
65    async_db_path = os.path.join(db_dir, 'Data.async.fs')
66    async_storage = FileStorage.FileStorage(async_db_path, read_only=True)
67    db1 = DB(async_storage, database_name="async", databases=databases)
68    base_storage = FileStorage.FileStorage(db_path, read_only=True)
69    blob_storage = BlobStorage(blob_dir, base_storage)
70    db2 = DB(blob_storage, databases=databases)
71    db2.open()
72    grok_bases()
73    return (db2, db1, base_storage, blob_storage, async_storage)
74
75
76def close_dbs(closables):
77    """Close all elements in `closables`.
78    """
79    for elem in closables:
80        print("Closing %s..." % elem)
81        elem.close()
82        print("Done.")
83
84
85def get_university(conn):
86    return conn.root()["Application"][APP_NAME]
87
88
89def get_all_students(container, cnt_from=0, cnt_to=0):
90    cnt = 0
91    for key in container:
92        if cnt < cnt_from:
93            cnt += 1
94            continue
95        if cnt_to and (cnt > cnt_to):
96            break
97        cnt += 1
98        elem = container.get(key)
99        yield elem
100        del elem
101
102
103def partition(container, part_size=10000):
104    """Partition `container` into chunks.
105
106    Get a list of triples (<num>, <index_start>, <index_end>) which
107    represent chunks of elements from `container`.
108
109    The `container` object must support `len()`.
110
111    Split length of `container` and tell what partitions we get, if each
112    partition is size `part_size` or less.
113
114    For instance a container of size 250 and `part_size` 100 would give:
115
116      [(0,   0,  99),
117       (1, 100, 199),
118       (2, 200, 249),
119       ]
120
121    """
122    num = len(container)
123    print("Container elements: %s" % num)
124    return [
125        (idx, start, min(start + part_size - 1, num - 1))
126        for idx, start in enumerate(range(0, num, part_size))]
127
128
129def get_mem_info():
130    """Get current memory info.
131
132    This works only, if `psutil` is installed locally (in virtualenv).
133    Otherwise we return `None`.
134    """
135    try:
136        # late import. We do not want to make it a waeup.uniben dependency.
137        import psutil
138    except ImportError:
139        return
140    proc = psutil.Process(os.getpid())
141    return proc.get_memory_info().rss
142
143
144def export_part(container, part_num, start, end, path):
145    """Export part number `part_num` from `container` to `path`.
146
147    `path` is the filesystem path we want to export to. `start` and
148    `end` are the index numbers of the elements we want to
149    export. Indexes are zero-based (starting with zero, not one).
150    """
151    gc.collect()
152    mem1 = get_mem_info()
153    print("  Export %s-%s to %s (mem: %s)" % (start, end, path, mem1))
154    tp1 = time.time()
155    exporter = getUtility(ICSVExporter, name=EXPORTER_NAME)
156    exporter.export(get_all_students(container, start, end), path)
157    tp2 = time.time()
158    mem2 = get_mem_info()
159    print("  Done (%s secs, mem: %s)" % (tp2 - tp1, mem2))
160
161
162def export():
163    """Main function.
164    """
165    closables = init_dbs()
166    conn = closables[0].open()
167    uni = get_university(conn)
168    studs = uni['students']
169
170    parts = partition(studs)
171
172    parts = [(0, 0, 100), ]
173    #parts = [(0, 0, 4999), (1, 5000, 9999), (2, 10000, 14999)]
174    #parts = [(0, 0, 14999), ]
175    #parts = [(0, 0, 160000), ]
176
177    workdir = tempfile.mkdtemp()
178
179    t1 = time.time()
180    conn.close()  # every loop will reopen the connection
181    for p_num, start, end in parts:
182        conn = closables[0].open()
183        uni = get_university(conn)
184        studs = uni['students']
185        curr_path = os.path.join(workdir, "myexport%s.csv" % p_num)
186        export_part(studs, p_num, start, end, curr_path)
187        conn.close()
188    print("Result in %s" % workdir)
189    t2 = time.time()
190    print("Elapsed: %s secs" % (t2 - t1))
Note: See TracBrowser for help on using the repository browser.