Changeset 8631
- Timestamp:
- 6 Jun 2012, 00:45:24 (13 years ago)
- Location:
- main/waeup.kofa/trunk/src/waeup/kofa/utils
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
main/waeup.kofa/trunk/src/waeup/kofa/utils/helpers.py
r8466 r8631 18 18 """General helper functions for Kofa. 19 19 """ 20 import csv 20 21 import datetime 21 22 import imghdr … … 24 25 import re 25 26 import shutil 27 import tempfile 26 28 import grok 27 29 from cStringIO import StringIO … … 639 641 if check_pdf not in imghdr.tests: 640 642 imghdr.tests.append(check_pdf) 643 644 def merge_csv_files(path1, path2): 645 """Merge two CSV files into one (appending). 646 647 CSV data from `path2` will be merged into `path1` csv file. This 648 is a bit like 'appending' data from path2 to data from path1. 649 650 The path of the resulting temporary file will be returned. 651 652 In the result file data from `path2` will always come _after_ data 653 from `path1`. 654 655 **Caution**: It is the _callers_ responsibility to remove the 656 result file (which is created by tempfile.mkstemp) after usage. 657 658 This CSV file merging copes with different column orders in both 659 CSV files and even with different column sets in both files. 660 """ 661 # sniff the col names 662 row10 = csv.DictReader(open(path1, 'rb')).next() 663 row20 = csv.DictReader(open(path2, 'rb')).next() 664 fieldnames = sorted(list(set(row10.keys() + row20.keys()))) 665 # now read/write the real data 666 reader1 = csv.DictReader(open(path1, 'rb')) 667 reader2 = csv.DictReader(open(path2, 'rb')) 668 wp, tmp_path = tempfile.mkstemp() 669 writer = csv.DictWriter(os.fdopen(wp, 'wb'), fieldnames) 670 writer.writerow(dict((x,x) for x in fieldnames)) # header 671 for row in reader1: 672 writer.writerow(row) 673 for row in reader2: 674 writer.writerow(row) 675 return tmp_path -
main/waeup.kofa/trunk/src/waeup/kofa/utils/tests/test_helpers.py
r8466 r8631 372 372 return 373 373 374 class MergeCSVFileTestCase(unittest.TestCase): 375 376 def setUp(self): 377 self.workdir = tempfile.mkdtemp() 378 self.path1 = os.path.join(self.workdir, 'myfile1') 379 self.path2 = os.path.join(self.workdir, 'myfile2') 380 self.result_path = None 381 return 382 383 def tearDown(self): 384 shutil.rmtree(self.workdir) 385 if self.result_path is not None and os.path.exists(self.result_path): 386 os.unlink(self.result_path) 387 return 388 389 def test_basic(self): 390 # we can merge very basic CSV files 391 open(self.path1, 'wb').write('name,age\nManfred,32\n') 392 open(self.path2, 'wb').write('name,age\nBarney,28\n') 393 self.result_path = helpers.merge_csv_files(self.path1, self.path2) 394 contents = open(self.result_path, 'r').read() 395 self.assertEqual( 396 contents, 397 'age,name\r\n' 398 '32,Manfred\r\n' 399 '28,Barney\r\n') 400 return 401 402 def test_different_col_order(self): 403 # if cols of both files have different order, that won't stop us 404 open(self.path1, 'wb').write('name,age\nManfred,32\n') 405 open(self.path2, 'wb').write('age,name\n28,Barney\n') 406 self.result_path = helpers.merge_csv_files(self.path1, self.path2) 407 contents = open(self.result_path, 'r').read() 408 self.assertEqual( 409 contents, 410 'age,name\r\n' 411 '32,Manfred\r\n' 412 '28,Barney\r\n') 413 return 414 415 def test_different_cols_at_all(self): 416 # also cols available only in one file will work. 417 open(self.path1, 'wb').write('name,age\nManfred,32\n') 418 open(self.path2, 'wb').write('name,age,buddy\nBarney,28,Manfred\n') 419 self.result_path = helpers.merge_csv_files(self.path1, self.path2) 420 contents = open(self.result_path, 'r').read() 421 self.assertEqual( 422 contents, 423 'age,buddy,name\r\n' 424 '32,,Manfred\r\n' 425 '28,Manfred,Barney\r\n') 426 return 427 428 374 429 def test_suite(): 375 430 suite = unittest.TestSuite() … … 386 441 DateTimeHelpersTestCase, 387 442 GetFileFormatTestCase, 443 MergeCSVFileTestCase, 388 444 ]: 389 445 suite.addTests(
Note: See TracChangeset for help on using the changeset viewer.