Ignore:
Timestamp:
6 Jun 2012, 00:45:24 (12 years ago)
Author:
uli
Message:

Add a helper function to append one CSV file to another.

Location:
main/waeup.kofa/trunk/src/waeup/kofa/utils
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • main/waeup.kofa/trunk/src/waeup/kofa/utils/helpers.py

    r8466 r8631  
    1818"""General helper functions for Kofa.
    1919"""
     20import csv
    2021import datetime
    2122import imghdr
     
    2425import re
    2526import shutil
     27import tempfile
    2628import grok
    2729from cStringIO import StringIO
     
    639641if check_pdf not in imghdr.tests:
    640642    imghdr.tests.append(check_pdf)
     643
     644def merge_csv_files(path1, path2):
     645    """Merge two CSV files into one (appending).
     646
     647    CSV data from `path2` will be merged into `path1` csv file. This
     648    is a bit like 'appending' data from path2 to data from path1.
     649
     650    The path of the resulting temporary file will be returned.
     651
     652    In the result file data from `path2` will always come _after_ data
     653    from `path1`.
     654
     655    **Caution**: It is the _callers_ responsibility to remove the
     656    result file (which is created by tempfile.mkstemp) after usage.
     657
     658    This CSV file merging copes with different column orders in both
     659    CSV files and even with different column sets in both files.
     660    """
     661    # sniff the col names
     662    row10 = csv.DictReader(open(path1, 'rb')).next()
     663    row20 = csv.DictReader(open(path2, 'rb')).next()
     664    fieldnames = sorted(list(set(row10.keys() + row20.keys())))
     665    # now read/write the real data
     666    reader1 = csv.DictReader(open(path1, 'rb'))
     667    reader2 = csv.DictReader(open(path2, 'rb'))
     668    wp, tmp_path = tempfile.mkstemp()
     669    writer = csv.DictWriter(os.fdopen(wp, 'wb'), fieldnames)
     670    writer.writerow(dict((x,x) for x in fieldnames)) # header
     671    for row in reader1:
     672        writer.writerow(row)
     673    for row in reader2:
     674        writer.writerow(row)
     675    return tmp_path
  • main/waeup.kofa/trunk/src/waeup/kofa/utils/tests/test_helpers.py

    r8466 r8631  
    372372        return
    373373
     374class MergeCSVFileTestCase(unittest.TestCase):
     375
     376    def setUp(self):
     377        self.workdir = tempfile.mkdtemp()
     378        self.path1 = os.path.join(self.workdir, 'myfile1')
     379        self.path2 = os.path.join(self.workdir, 'myfile2')
     380        self.result_path = None
     381        return
     382
     383    def tearDown(self):
     384        shutil.rmtree(self.workdir)
     385        if self.result_path is not None and os.path.exists(self.result_path):
     386            os.unlink(self.result_path)
     387        return
     388
     389    def test_basic(self):
     390        # we can merge very basic CSV files
     391        open(self.path1, 'wb').write('name,age\nManfred,32\n')
     392        open(self.path2, 'wb').write('name,age\nBarney,28\n')
     393        self.result_path = helpers.merge_csv_files(self.path1, self.path2)
     394        contents = open(self.result_path, 'r').read()
     395        self.assertEqual(
     396            contents,
     397            'age,name\r\n'
     398            '32,Manfred\r\n'
     399            '28,Barney\r\n')
     400        return
     401
     402    def test_different_col_order(self):
     403        # if cols of both files have different order, that won't stop us
     404        open(self.path1, 'wb').write('name,age\nManfred,32\n')
     405        open(self.path2, 'wb').write('age,name\n28,Barney\n')
     406        self.result_path = helpers.merge_csv_files(self.path1, self.path2)
     407        contents = open(self.result_path, 'r').read()
     408        self.assertEqual(
     409            contents,
     410            'age,name\r\n'
     411            '32,Manfred\r\n'
     412            '28,Barney\r\n')
     413        return
     414
     415    def test_different_cols_at_all(self):
     416        # also cols available only in one file will work.
     417        open(self.path1, 'wb').write('name,age\nManfred,32\n')
     418        open(self.path2, 'wb').write('name,age,buddy\nBarney,28,Manfred\n')
     419        self.result_path = helpers.merge_csv_files(self.path1, self.path2)
     420        contents = open(self.result_path, 'r').read()
     421        self.assertEqual(
     422            contents,
     423            'age,buddy,name\r\n'
     424            '32,,Manfred\r\n'
     425            '28,Manfred,Barney\r\n')
     426        return
     427
     428
    374429def test_suite():
    375430    suite = unittest.TestSuite()
     
    386441        DateTimeHelpersTestCase,
    387442        GetFileFormatTestCase,
     443        MergeCSVFileTestCase,
    388444        ]:
    389445        suite.addTests(
Note: See TracChangeset for help on using the changeset viewer.