Ignore:
Timestamp:
6 Jun 2012, 01:40:08 (12 years ago)
Author:
uli
Message:

Let datacenter use the new CSV file merger from helpers.

CAUTION: this is a bit tested, but not in real use. Try
some local imports before using it in production environment!

File:
1 edited

Legend:

Unmodified
Added
Removed
  • main/waeup.kofa/trunk/src/waeup/kofa/tests/test_datacenter.py

    r8592 r8634  
    213213        self.assertEqual(names, expected)
    214214        return
     215
     216    def test_append_csv_file(self):
     217        # we can append CSV files to others
     218        datacenter = DataCenter()
     219        csv_file1 = os.path.join(self.workdir, 'foo.csv')
     220        csv_file2 = os.path.join(self.workdir, 'bar.csv')
     221        open(csv_file1, 'wb').write('name,age\nBarney,28')
     222        open(csv_file2, 'wb').write('name,age\nManfred,28')
     223        datacenter._appendCSVFile(csv_file2, csv_file1)
     224        result = open(csv_file1, 'rb').read()
     225        self.assertEqual(result, 'age,name\r\n28,Barney\r\n28,Manfred\r\n')
     226        # The source is deleted afterwards
     227        self.assertEqual(os.path.exists(csv_file2), False)
     228
     229    def test_append_csv_file_no_dest(self):
     230        # a non-existing dest CSV file will result in a simple move
     231        datacenter = DataCenter()
     232        csv_file1 = os.path.join(self.workdir, 'foo.csv')
     233        csv_file2 = os.path.join(self.workdir, 'bar.csv')
     234        # csv_file1 does not exist
     235        open(csv_file2, 'wb').write('name,age\nManfred,28\n')
     236        datacenter._appendCSVFile(csv_file2, csv_file1)
     237        result = open(csv_file1, 'rb').read()
     238        # raw input, no CSV mangling
     239        self.assertEqual(result, 'name,age\nManfred,28\n')
     240        # The source is deleted afterwards
     241        self.assertEqual(os.path.exists(csv_file2), False)
     242
     243    def test_append_csv_file_no_source(self):
     244        # a non existing source file will mean no changes at all
     245        datacenter = DataCenter()
     246        csv_file1 = os.path.join(self.workdir, 'foo.csv')
     247        csv_file2 = os.path.join(self.workdir, 'bar.csv')
     248        open(csv_file1, 'wb').write('name,age\nManfred,28\n')
     249        # csv_file2 does not exist
     250        datacenter._appendCSVFile(csv_file2, csv_file1)
     251        result = open(csv_file1, 'rb').read()
     252        # csv_file1 is the same as before
     253        self.assertEqual(result, 'name,age\nManfred,28\n')
     254
     255    def test_append_csv_file_same_src_and_dest(self):
     256        # if both csv files are the same file, nothing will be changed
     257        datacenter = DataCenter()
     258        csv_file1 = os.path.join(self.workdir, 'foo.csv')
     259        csv_file2 = csv_file1
     260        open(csv_file1, 'wb').write('name,age\nManfred,28\n')
     261        # csv_file2 does not exist
     262        datacenter._appendCSVFile(csv_file2, csv_file1)
     263        result = open(csv_file1, 'rb').read()
     264        # csv_file1 is the same as before
     265        self.assertEqual(result, 'name,age\nManfred,28\n')
     266        self.assertEqual(os.path.exists(csv_file2), True)
Note: See TracChangeset for help on using the changeset viewer.