Merge pull request #1271 from SchrodingersGat/migration-unit-test

Migration unit test
This commit is contained in:
Oliver 2021-02-04 23:56:12 +11:00 committed by GitHub
commit af7a627230
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 495 additions and 44 deletions

View File

@ -1,8 +1,6 @@
[run]
source = ./InvenTree
omit =
# Do not run coverage on migration files
*/migrations/*
InvenTree/manage.py
InvenTree/setup.py
InvenTree/InvenTree/middleware.py

View File

@ -48,6 +48,10 @@ script:
- rm inventree_default_db.sqlite3
- invoke migrate
- invoke import-records -f data.json
# Run linting checks on migration files (django-migration-linter)
# Run subset of linting checks on *ALL* migration files
# Run strict migration file checks on *NEW* migrations (old ones are what they are)
- cd InvenTree && python manage.py lintmigrations 79ddea50f507e34195bad635008419daac0d7a5f -q ok ignore --no-cache && cd ..
after_success:
- coveralls

View File

@ -492,3 +492,72 @@ def addUserPermissions(user, permissions):
for permission in permissions:
addUserPermission(user, permission)
def getMigrationFileNames(app):
"""
Return a list of all migration filenames for provided app
"""
local_dir = os.path.dirname(os.path.abspath(__file__))
migration_dir = os.path.join(local_dir, '..', app, 'migrations')
files = os.listdir(migration_dir)
# Regex pattern for migration files
pattern = r"^[\d]+_.*\.py$"
migration_files = []
for f in files:
if re.match(pattern, f):
migration_files.append(f)
return migration_files
def getOldestMigrationFile(app, exclude_extension=True, ignore_initial=True):
"""
Return the filename associated with the oldest migration
"""
oldest_num = -1
oldest_file = None
for f in getMigrationFileNames(app):
if ignore_initial and f.startswith('0001_initial'):
continue
num = int(f.split('_')[0])
if oldest_file is None or num < oldest_num:
oldest_num = num
oldest_file = f
if exclude_extension:
oldest_file = oldest_file.replace('.py', '')
return oldest_file
def getNewestMigrationFile(app, exclude_extension=True):
"""
Return the filename associated with the newest migration
"""
newest_file = None
newest_num = -1
for f in getMigrationFileNames(app):
num = int(f.split('_')[0])
if newest_file is None or num > newest_num:
newest_num = num
newest_file = f
if exclude_extension:
newest_file = newest_file.replace('.py', '')
return newest_file

View File

@ -212,6 +212,7 @@ INSTALLED_APPS = [
'djmoney', # django-money integration
'djmoney.contrib.exchange', # django-money exchange rates
'error_report', # Error reporting in the admin interface
'django_migration_linter', # Linting checking for migration files
]
MIDDLEWARE = CONFIG.get('middleware', [

View File

@ -17,6 +17,8 @@ def nupdate_tree(apps, schema_editor):
class Migration(migrations.Migration):
atomic = False
dependencies = [
('build', '0012_build_sales_order'),
]

View File

@ -9,7 +9,7 @@ def add_default_reference(apps, schema_editor):
Best we can do is use the PK of the build order itself.
"""
Build = apps.get_model('build', 'Build')
Build = apps.get_model('build', 'build')
count = 0
@ -31,6 +31,8 @@ def reverse_default_reference(apps, schema_editor):
class Migration(migrations.Migration):
atomic = False
dependencies = [
('build', '0017_auto_20200426_0612'),
]

View File

@ -0,0 +1,118 @@
"""
Tests for the build model database migrations
"""
from django_test_migrations.contrib.unittest_case import MigratorTestCase
from InvenTree import helpers
class TestForwardMigrations(MigratorTestCase):
"""
Test entire schema migration sequence for the build app
"""
migrate_from = ('build', helpers.getOldestMigrationFile('build'))
migrate_to = ('build', helpers.getNewestMigrationFile('build'))
def prepare(self):
"""
Create initial data!
"""
Part = self.old_state.apps.get_model('part', 'part')
buildable_part = Part.objects.create(
name='Widget',
description='Buildable Part',
active=True,
)
with self.assertRaises(TypeError):
# Cannot set the 'assembly' field as it hasn't been added to the db schema
Part.objects.create(
name='Blorb',
description='ABCDE',
assembly=True
)
Build = self.old_state.apps.get_model('build', 'build')
Build.objects.create(
part=buildable_part,
title='A build of some stuff',
quantity=50
)
def test_items_exist(self):
Part = self.new_state.apps.get_model('part', 'part')
self.assertEqual(Part.objects.count(), 1)
Build = self.new_state.apps.get_model('build', 'build')
self.assertEqual(Build.objects.count(), 1)
# Check that the part object now has an assembly field
part = Part.objects.all().first()
part.assembly = True
part.save()
part.assembly = False
part.save()
class TestReferenceMigration(MigratorTestCase):
"""
Test custom migration which adds 'reference' field to Build model
"""
migrate_from = ('build', helpers.getOldestMigrationFile('build'))
migrate_to = ('build', '0018_build_reference')
def prepare(self):
"""
Create some builds
"""
Part = self.old_state.apps.get_model('part', 'part')
part = Part.objects.create(
name='Part',
description='A test part'
)
Build = self.old_state.apps.get_model('build', 'build')
Build.objects.create(
part=part,
title='My very first build',
quantity=10
)
Build.objects.create(
part=part,
title='My very second build',
quantity=10
)
Build.objects.create(
part=part,
title='My very third build',
quantity=10
)
# Ensure that the builds *do not* have a 'reference' field
for build in Build.objects.all():
with self.assertRaises(AttributeError):
print(build.reference)
def test_build_reference(self):
Build = self.new_state.apps.get_model('build', 'build')
self.assertEqual(Build.objects.count(), 3)
# Check that the build reference is properly assigned
for build in Build.objects.all():
self.assertEqual(str(build.reference), str(build.pk))

View File

@ -1,14 +1,21 @@
# Generated by Django 2.2.10 on 2020-04-13 06:42
import sys
import os
from rapidfuzz import fuzz
from django.db import migrations, connection
from django.db.utils import OperationalError, ProgrammingError
"""
When this migration is tested by CI, it cannot accept user input.
So a simplified version of the migration is implemented.
"""
TESTING = 'test' in sys.argv
def clear():
os.system('cls' if os.name == 'nt' else 'clear')
if not TESTING:
os.system('cls' if os.name == 'nt' else 'clear')
def reverse_association(apps, schema_editor):
@ -144,10 +151,12 @@ def associate_manufacturers(apps, schema_editor):
# Have we already mapped this
if name in links.keys():
print(" - Part[{pk}]: Mapped '{n}' - '{c}'".format(pk=part_id, n=name, c=links[name].name))
print(" - Part[{pk}]: Mapped '{n}' - manufacturer <{c}>".format(pk=part_id, n=name, c=links[name]))
manufacturer_id = links[name]
query = f"update part_supplierpart set manufacturer_id={manufacturer_id} where id={part_id};"
result = query.execute()
result = cursor.execute(query)
return True
# Mapping not possible
@ -156,29 +165,24 @@ def associate_manufacturers(apps, schema_editor):
def create_manufacturer(part_id, input_name, company_name):
""" Create a new manufacturer """
# Manually create a new database row
# Note: Have to fill out all empty string values!
new_manufacturer_query = f"insert into company_company ('name', 'description', 'is_customer', 'is_supplier', 'is_manufacturer', 'address', 'website', 'phone', 'email', 'contact', 'link', 'notes') values ('{company_name}', '{company_name}', false, false, true, '', '', '', '', '', '', '');"
Company = apps.get_model('company', 'company')
cursor = connection.cursor()
cursor.execute(new_manufacturer_query)
# Extract the company back from the database
response = cursor.execute(f"select id from company_company where name='{company_name}';")
row = cursor.fetchone()
manufacturer_id = int(row[0])
manufacturer = Company.objects.create(
name=company_name,
description=company_name,
is_manufacturer=True
)
# Map both names to the same company
links[input_name] = manufacturer_id
links[company_name] = manufacturer_id
links[input_name] = manufacturer.pk
links[company_name] = manufacturer.pk
companies[company_name] = manufacturer_id
companies[company_name] = manufacturer.pk
print(" - Part[{pk}]: Created new manufacturer: '{name}'".format(pk=part_id, name=company_name))
# Update SupplierPart object in the database
cursor.execute(f"update part_supplierpart set manufacturer_id={manufacturer_id} where id={part_id};")
cursor.execute(f"update part_supplierpart set manufacturer_id={manufacturer.pk} where id={part_id};")
def find_matches(text, threshold=65):
"""
@ -222,23 +226,31 @@ def associate_manufacturers(apps, schema_editor):
clear()
# Present a list of options
print("----------------------------------")
if not TESTING:
print("----------------------------------")
print("Checking part [{pk}] ({idx} of {total})".format(pk=part_id, idx=idx+1, total=total))
print("Manufacturer name: '{n}'".format(n=name))
print("----------------------------------")
print("Select an option from the list below:")
if not TESTING:
print("Manufacturer name: '{n}'".format(n=name))
print("----------------------------------")
print("Select an option from the list below:")
print("0) - Create new manufacturer '{n}'".format(n=name))
print("")
print("0) - Create new manufacturer '{n}'".format(n=name))
print("")
for i, m in enumerate(matches[:10]):
print("{i}) - Use manufacturer '{opt}'".format(i=i+1, opt=m))
for i, m in enumerate(matches[:10]):
print("{i}) - Use manufacturer '{opt}'".format(i=i+1, opt=m))
print("")
print("OR - Type a new custom manufacturer name")
print("")
print("OR - Type a new custom manufacturer name")
while True:
response = str(input("> ")).strip()
if TESTING:
# When running unit tests, simply select the name of the part
response = '0'
else:
response = str(input("> ")).strip()
# Attempt to parse user response as an integer
try:
@ -300,18 +312,19 @@ def associate_manufacturers(apps, schema_editor):
print("")
clear()
print("---------------------------------------")
print("The SupplierPart model needs to be migrated,")
print("as the new 'manufacturer' field maps to a 'Company' reference.")
print("The existing 'manufacturer_name' field will be used to match")
print("against possible companies.")
print("This process requires user input.")
print("")
print("Note: This process MUST be completed to migrate the database.")
print("---------------------------------------")
print("")
if not TESTING:
print("---------------------------------------")
print("The SupplierPart model needs to be migrated,")
print("as the new 'manufacturer' field maps to a 'Company' reference.")
print("The existing 'manufacturer_name' field will be used to match")
print("against possible companies.")
print("This process requires user input.")
print("")
print("Note: This process MUST be completed to migrate the database.")
print("---------------------------------------")
print("")
input("Press <ENTER> to continue.")
input("Press <ENTER> to continue.")
clear()
@ -336,6 +349,8 @@ def associate_manufacturers(apps, schema_editor):
class Migration(migrations.Migration):
atomic = False
dependencies = [
('company', '0018_supplierpart_manufacturer'),
]

View File

@ -19,6 +19,8 @@ def make_empty_email_field_null(apps, schema_editor):
class Migration(migrations.Migration):
atomic = False
dependencies = [
('company', '0023_auto_20200808_0715'),
]

View File

@ -138,6 +138,8 @@ def reverse_currencies(apps, schema_editor):
class Migration(migrations.Migration):
atomic = False
dependencies = [
('company', '0025_auto_20201110_1001'),
]

View File

@ -0,0 +1,171 @@
"""
Tests for the company model database migrations
"""
from django_test_migrations.contrib.unittest_case import MigratorTestCase
from InvenTree import helpers
class TestForwardMigrations(MigratorTestCase):
migrate_from = ('company', helpers.getOldestMigrationFile('company'))
migrate_to = ('company', helpers.getNewestMigrationFile('company'))
def prepare(self):
"""
Create some simple Company data, and ensure that it migrates OK
"""
Company = self.old_state.apps.get_model('company', 'company')
Company.objects.create(
name='MSPC',
description='Michael Scotts Paper Company',
is_supplier=True
)
def test_migrations(self):
Company = self.new_state.apps.get_model('company', 'company')
self.assertEqual(Company.objects.count(), 1)
class TestManufacturerField(MigratorTestCase):
"""
Tests for migration 0019 which migrates from old 'manufacturer_name' field to new 'manufacturer' field
"""
migrate_from = ('company', '0018_supplierpart_manufacturer')
migrate_to = ('company', '0019_auto_20200413_0642')
def prepare(self):
"""
Prepare the database by adding some test data 'before' the change:
- Part object
- Company object (supplier)
- SupplierPart object
"""
Part = self.old_state.apps.get_model('part', 'part')
Company = self.old_state.apps.get_model('company', 'company')
SupplierPart = self.old_state.apps.get_model('company', 'supplierpart')
# Create an initial part
part = Part.objects.create(
name='Screw',
description='A single screw'
)
# Create a company to act as the supplier
supplier = Company.objects.create(
name='Supplier',
description='A supplier of parts',
is_supplier=True,
is_customer=False,
)
# Add some SupplierPart objects
SupplierPart.objects.create(
part=part,
supplier=supplier,
SKU='SCREW.001',
manufacturer_name='ACME',
)
SupplierPart.objects.create(
part=part,
supplier=supplier,
SKU='SCREW.002',
manufacturer_name='Zero Corp'
)
self.assertEqual(Company.objects.count(), 1)
def test_company_objects(self):
"""
Test that the new companies have been created successfully
"""
# Two additional company objects should have been created
Company = self.new_state.apps.get_model('company', 'company')
self.assertEqual(Company.objects.count(), 3)
# The new company/ies must be marked as "manufacturers"
acme = Company.objects.get(name='ACME')
self.assertTrue(acme.is_manufacturer)
SupplierPart = self.new_state.apps.get_model('company', 'supplierpart')
parts = SupplierPart.objects.filter(manufacturer=acme)
self.assertEqual(parts.count(), 1)
part = parts.first()
# Checks on the SupplierPart object
self.assertEqual(part.manufacturer_name, 'ACME')
self.assertEqual(part.manufacturer.name, 'ACME')
class TestCurrencyMigration(MigratorTestCase):
"""
Tests for upgrade from basic currency support to django-money
"""
migrate_from = ('company', '0025_auto_20201110_1001')
migrate_to = ('company', '0026_auto_20201110_1011')
def prepare(self):
"""
Prepare some data:
- A part to buy
- A supplier to buy from
- A supplier part
- Multiple currency objects
- Multiple supplier price breaks
"""
Part = self.old_state.apps.get_model('part', 'part')
part = Part.objects.create(
name="PART", description="A purchaseable part",
purchaseable=True,
level=0,
tree_id=0,
lft=0,
rght=0
)
Company = self.old_state.apps.get_model('company', 'company')
supplier = Company.objects.create(name='Supplier', description='A supplier', is_supplier=True)
SupplierPart = self.old_state.apps.get_model('company', 'supplierpart')
sp = SupplierPart.objects.create(part=part, supplier=supplier, SKU='12345')
Currency = self.old_state.apps.get_model('common', 'currency')
aud = Currency.objects.create(symbol='$', suffix='AUD', description='Australian Dollars', value=1.0)
usd = Currency.objects.create(symbol='$', suffix='USD', description='US Dollars', value=1.0)
PB = self.old_state.apps.get_model('company', 'supplierpricebreak')
PB.objects.create(part=sp, quantity=10, cost=5, currency=aud)
PB.objects.create(part=sp, quantity=20, cost=3, currency=aud)
PB.objects.create(part=sp, quantity=30, cost=2, currency=aud)
PB.objects.create(part=sp, quantity=40, cost=2, currency=usd)
PB.objects.create(part=sp, quantity=50, cost=2, currency=usd)
for pb in PB.objects.all():
self.assertIsNone(pb.price)
def test_currency_migration(self):
PB = self.new_state.apps.get_model('company', 'supplierpricebreak')
for pb in PB.objects.all():
# Test that a price has been assigned
self.assertIsNotNone(pb.price)

View File

@ -12,6 +12,8 @@ def update_tree(apps, schema_editor):
class Migration(migrations.Migration):
atomic = False
dependencies = [
('part', '0019_auto_20190908_0404'),
]

View File

@ -18,6 +18,8 @@ def create_thumbnails(apps, schema_editor):
class Migration(migrations.Migration):
atomic = False
dependencies = [
('part', '0033_auto_20200404_0445'),
]

View File

@ -16,6 +16,8 @@ def nupdate_tree(apps, schema_editor):
class Migration(migrations.Migration):
atomic = False
dependencies = [
('part', '0038_auto_20200513_0016'),
]

View File

@ -138,6 +138,8 @@ def reverse_currencies(apps, schema_editor):
class Migration(migrations.Migration):
atomic = False
dependencies = [
('part', '0055_auto_20201110_1001'),
]

View File

@ -0,0 +1,51 @@
"""
Unit tests for the part model database migrations
"""
from django_test_migrations.contrib.unittest_case import MigratorTestCase
from InvenTree import helpers
class TestForwardMigrations(MigratorTestCase):
"""
Test entire schema migration sequence for the part app
"""
migrate_from = ('part', helpers.getOldestMigrationFile('part'))
migrate_to = ('part', helpers.getNewestMigrationFile('part'))
def prepare(self):
"""
Create initial data
"""
Part = self.old_state.apps.get_model('part', 'part')
Part.objects.create(name='A', description='My part A')
Part.objects.create(name='B', description='My part B')
Part.objects.create(name='C', description='My part C')
Part.objects.create(name='D', description='My part D')
Part.objects.create(name='E', description='My part E')
# Extract one part object to investigate
p = Part.objects.all().last()
# Initially some fields are not present
with self.assertRaises(AttributeError):
print(p.has_variants)
with self.assertRaises(AttributeError):
print(p.is_template)
def test_models_exist(self):
Part = self.new_state.apps.get_model('part', 'part')
self.assertEqual(Part.objects.count(), 5)
for part in Part.objects.all():
part.is_template = True
part.save()
part.is_template = False
part.save()

View File

@ -13,6 +13,8 @@ def update_tree(apps, schema_editor):
class Migration(migrations.Migration):
atomic = False
dependencies = [
('stock', '0011_auto_20190908_0404'),
]

View File

@ -12,6 +12,8 @@ def update_stock_item_tree(apps, schema_editor):
class Migration(migrations.Migration):
atomic = False
dependencies = [
('stock', '0021_auto_20200215_2232'),
]

View File

@ -28,6 +28,8 @@ django-debug-toolbar==2.2 # Debug / profiling toolbar
django-admin-shell==0.1.2 # Python shell for the admin interface
django-money==1.1 # Django app for currency management
certifi # Certifi is (most likely) installed through one of the requirements above
django-error-report==0.2.0 # Error report viewer for the admin interface
django-error-report==0.2.0 # Error report viewer for the admin interface
django-test-migrations==1.1.0 # Unit testing for database migrations
django-migration-linter==2.5.0 # Linting checks for database migrations
inventree # Install the latest version of the InvenTree API python library

View File

@ -9,6 +9,8 @@ ignore =
C901,
# - N802 - function name should be lowercase (In the future, we should conform to this!)
N802,
# - N806 - variable should be lowercase
N806,
N812,
exclude = .git,__pycache__,*/migrations/*,*/lib/*,*/bin/*,*/media/*,*/static/*,*ci_*.py*
max-complexity = 20