source: main/waeup.uniben/trunk/src/waeup/uniben/scripts.py @ 13193

Last change on this file since 13193 was 13193, checked in by uli, 9 years ago

Fix partition().

Return last element with proper end value.

File size: 4.9 KB
Line 
1"""The export() function herein is called by the kofaexport script.
2
3To make this work, you have to pip install psutil in your local virtualenv.
4"""
5import gc
6import grok
7import os
8import tempfile
9import time
10from ZODB import DB, DemoStorage, FileStorage
11from ZODB.blob import BlobStorage
12from zope.component import getUtility
13from waeup.kofa.interfaces import ICSVExporter
14
15
16#: Remember what packages were grokked already.
17grokked = dict()
18
19
20#: Packages that have to be 'grokked' in order to register exporters
21#: and everything else.
22#: The order of these packages is usually relevant.
23TO_BE_GROKKED = ("waeup.kofa", "kofacustom.nigeria", "waeup.uniben")
24
25#: The data root from where we can look out for Data.fs, etc.
26#: We usually expect a `var/` directory in this root.
27INSTANCE_HOME = os.path.dirname(os.path.dirname(os.path.dirname(
28    os.path.dirname(__file__))))
29
30
31def grok_bases():
32    """Grok packages not already grokked.
33    """
34    for pkg_name in TO_BE_GROKKED:
35        if grokked.get(pkg_name, False):
36            continue
37        print("Grokking %s..." % pkg_name)
38        grokked[pkg_name] = True
39        grok.testing.grok(pkg_name)
40        print("Done.")
41
42
43def init_dbs():
44    """Setup databases.
45
46    We return a sequence of `closables`. The closables can be passed
47    to close_dbs() if you're done.
48
49    The first of the elements returned is the main database. open()
50    it for a new connection to the ZODB.
51    """
52    # https://github.com/zopefoundation/ZODB/blob/master/src/ZODB/cross-database-references.txt
53    databases = {}
54    root_dir = INSTANCE_HOME
55    db_dir = os.path.join(root_dir, 'var', 'filestorage')
56    blob_dir = os.path.join(root_dir, 'var', 'blobstorage')
57    db_path = os.path.join(db_dir, 'Data.fs')
58    async_db_path = os.path.join(db_dir, 'Data.async.fs')
59    async_storage = FileStorage.FileStorage(async_db_path, read_only=True)
60    db1 = DB(async_storage, database_name="async", databases=databases)
61    base_storage = FileStorage.FileStorage(db_path, read_only=True)
62    blob_storage = BlobStorage(blob_dir, base_storage)
63    db2 = DB(blob_storage, databases=databases)
64    db2.open()
65    grok_bases()
66    return (db2, db1, base_storage, blob_storage, async_storage)
67
68
69def close_dbs(closables):
70    """Close all elements in `closables`.
71    """
72    for elem in closables:
73        print("Closing %s..." % elem)
74        elem.close()
75        print("Done.")
76
77
78def get_university(conn):
79    return conn.root()["Application"]["uniben"]
80
81
82def export():
83    """Main function.
84    """
85    closables = init_dbs()
86    conn = closables[0].open()
87    uni = get_university(conn)
88    studs = uni['students']
89
90    parts= partition(studs)
91    print(parts)
92
93    #parts = [(0, 0, 100), ]
94    #parts = [(0, 0, 4999), (1, 5000, 9999), (2, 10000, 14999)]
95    #parts = [(0, 0, 14999), ]
96    #parts = [(0, 0, 160000), ]
97
98    workdir = tempfile.mkdtemp()
99
100    t1 = time.time()
101    conn.close()  # every loop will reopen the connection
102    for p_num, start, end in parts:
103        conn = closables[0].open()
104        uni = get_university(conn)
105        studs = uni['students']
106        curr_path = os.path.join(workdir, "myexport%s.csv" % p_num)
107        export_part(studs, p_num, start, end, curr_path)
108        conn.close()
109    print("Result in %s" % workdir)
110    t2 = time.time()
111    print("Elapsed: %s secs" % (t2 - t1))
112    import pdb; pdb.set_trace()
113
114
115def export_part(container, part_num, start, end, path):
116    import psutil  # late import. Don't want to make it waeup.uniben req.
117    PROC = psutil.Process(os.getpid())
118    gc.collect()
119    mem1 = PROC.get_memory_info().rss
120    print("  Export %s-%s to %s (mem: %s)" % (start, end, path, mem1))
121    tp1 = time.time()
122    exporter = getUtility(ICSVExporter, name="students")
123    exporter.export(get_all_students(container, start, end), path)
124    tp2 = time.time()
125    mem2 = PROC.get_memory_info().rss
126    print("  Done (%s secs, mem: %s)" % (tp2 - tp1, mem2))
127
128
129def get_all_students(container, cnt_from=0, cnt_to=0):
130    cnt = 0
131    for key in container:
132        if cnt < cnt_from:
133            cnt += 1
134            continue
135        if cnt_to and (cnt > cnt_to):
136            break
137        cnt += 1
138        elem = container.get(key)
139        yield elem
140        del elem
141
142
143def partition(container, part_size=10000):
144    """Partition `container` into chunks.
145
146    Get a list of triples (<num>, <index_start>, <index_end>) which
147    represent chunks of elements from `container`.
148
149    The `container` object must support `len()`.
150
151    Split length of `container` and tell what partitions we get, if each
152    partition is size `part_size` or less.
153
154    For instance a container of size 250 and `part_size` 100 would give:
155
156      [(0,   0,  99),
157       (1, 100, 199),
158       (2, 200, 249),
159       ]
160
161    """
162    num = len(container)
163    print("Container elements: %s" % num)
164    return [(idx, start, min(start + part_size - 1, num - 1))
165             for idx, start in enumerate(range(0, num, part_size))]
Note: See TracBrowser for help on using the repository browser.