InvenTree/tasks.py
Oliver 1f6cd9fc54
[WIP] Data importer (#6911)
* Adds new model for DataImportSession

* Add file extension validation

Expose to admin interface also

* Switch to new 'importer' app

* Refactoring to help prevent circular imports

* Add serializer registry

- Use @register_importer tag for any serializer class

* Cleanup migration file

- Do not use one-time hard-coded values here

* Refactor code into registry.py

* Add validation for the uploaded file

- Must be importable by tablib

* Refactoring

* Adds property to retrieve matching serializer class

* Update helper functions

* Add hook to auto-assign columns on initial creation

* Rename field

* Enforce initial status value

* Add model for individual rows in the data import

* Add DataImportRow model

* Extract data rows as dict

* Update fields

- Remove "progress" field (will be calculated)
- Added "timestamp" field
- Added "complete" field to DataImportRow

* Auto-map column names

- Provide "sensible" default values

* Add API endpoint for DataImportSession

* Offload data import operation

- For large data files this may take a significant amount of time
- Offload it to the background worker process

* Refactor data import code

* Update models

- Add "columns" field to DataImportSession
- Add "errors" field to DataImportRow

* Move field mapping to a new model type

- Simpler validation

* Save "valid" status for each data row

* Include session defaults when validating row data

* Update content_excludes

- Ignore importer models in import/export

* Remove port from ALLOWED_HOST entries

* Skip table events for importer models

* Bug fixes

* Serializer updates

* Add more endpoints

- DataImportColumnMappingList
- DataImportRowList

* further updates:

- Add 'get_api_url' method
- Handle case where

* Expose "available fields" to the DataImportSession serializer

Uses the (already available) inventree metadata middleware

* Add detail endpoints

* Clear existing column mappings

* Add endpoint for accepting column mappings

* Add API endpoint exposing available importer serializers

* Add simple playground area for testing data importer

* Adds simple form to start new import session

- Needs work, file field does not currently function correctly

* data_file is *not* read_only

* Add check for file type

* Remove debug statements

* Refactor column mapping

- Generate mapping for each column
- Remove "columns" field
- Column names are calculated dynamically

* Fix uniqueness requirements on mapping table

* Admin updates

- Prevent deletion of mappings
- Prevent addition of mappings

* API endpoint updates

- Prevent mappings from being deleted
- Prevent mappings from being created

* Update importer drawer

* Add widget for selecting data columns

* UI tweaks

* Delete import session when closing modal

* Allow empty string value

* Complete column mapping

* Adds ability to remove rows

* Adjust drawer specs

* Add column 'description' to serializer

* Add option to hide labels in API form field

* Update column heading

* Fix frontend linting errors

* Revert drawer position

* Return correct type

* Fix shadowing

* Fix f-string

* simplify frontend code

* Move importer app

* Update API version

* Reintroduce export formats

* Add new models to RuleSet

* typescript cleanup

* Typescript cleanup

* Improvement for Switch / boolean field

* Display original row data on popover

* Only display mapped columns

* Add DataExportMixin class

- Replaces existing APIDownloadMixin
- Uses DRF serializers for exporting
- *much* more efficient

* Create new file: importer.mixins.py

* Add new mixin to existing views which support data export

* Better error handling

* Cleanup:

- Remove references to APIDownloadMixin
- Remove download_queryset method
- All now handled by API-based export functionality

* Replace table with InvenTreeTable

- Paginate imported rows
- Data can be searched, ordered,

* Make 'pathstring' fields read-only

* Expose list of valid importer types to the API

* Exclude read-only fields

* Cleanup

* Updates for session model

- Column is now editable on mapping object
- Field is no  longer editable
- Improve admin integration

* Adds new custom hook for controlling data import session

* Refactor column mapping widget

* Refactor ImportDataSelector

* Working on ImportDataSelector component

* Adds method for editing fields in import table

- Cell edit mode
- Row edit mode
- Form submission still needs work!

* Adds background task for removing old import sessions

* Fix api_version.py

* Update src/frontend/src/components/importer/ImportDataSelector.tsx

Co-authored-by: Lukas <76838159+wolflu05@users.noreply.github.com>

* Update model verbose names

* Rename mixin class

* Add serializer mixin classes

- Will allow for fine-tuning of the import/export proces

* @register_importer requires specific mixin

* Check subclass for export

* Fix typos

* Refactor export serializer

- Keep operations local to the class

* Add shim class to process an individual row before exporting it

* Add mixin to existing serializers

* Add export functionality for company serializers

* Adds  placeholder for custom admin class

* Update mantine styling

* spacing -> gap

* Add functionality to pre-process form data before upload

* Remove old references to download_queryset

* Improvements for data import drawer:

- Pin title at top of drawer

* Further improvements

* Fix column selection input

* Formatting improvements

* Use a <Stepper> component for better progress display

* Cleanup text

* Add export-only fields to BuildItem queryset

* Expand "export" fields for BuildItem dataset

* Skip backup and static steps in CI

* Remove hard-coded paths

* Fix for "accept_mapping" method

* Present required fields first on import session

* Add "get_importable_fields" method

* Add method for commiting imported row to database

* Cleanup

* Save "complete" state after row import

* Allow prevention of column caching

* Remove debug statement

* Add basic admin table for import sessions

* Fix for table filter functions

- New mantine version requires string values

* Add filters for import session table

* Remove debug message

* fix for <FilterItem />

* Create new import session from admin page

* Cleanup playground

* Re-open an existing import session

* Memoize cell value

* Update <ImportDataSelector>

* Enable download of build line data

* Add extra detail fields

* Register data importers for the stock app

* Enable download of stock item tracking data

* Register importerrs for "company" app

* Register importers for the "order" app

* Add extra fields to purchase order line item serializer

* Update verbose names for order models

* Cleanup import data table rendering

* Pass session information through to cell renderer

* add separate 'field_overrides' field

* Expose 'field_overrides' to API

* Refactor import field selection

* Use override data if provided

* Fix data extraction

- Ignore columns which are not mapped

* Fix fields.pop

- Provide 'None' argument

* Update import data rendering

* Handle missing / empty column names when importing data

* Bug fixin'

* Update hook

* Adds button to upload data straight to table

* Cache "available_fields"

- Reduces API access time by 85%

* Fix calculation of completed_row_count

* Import individual rows from import session

* Allow import of multiple simultaneous records

* Improve extraction of metadata

- Especially for related fields
- Request object no longer required

* Implement suspended rendering of model instances

* Cleanup

* Implement more columns for StockTable

* Allow stock filtering by packaging field

* Fix "stock_value" column

* Improve metadata extraction

- Handle read_only_fields in Meta
- Handle write_only_fields in Meta

* Increase maximum number of importable rows

* Force data import to run on background worker

* Add export-only fields to StockItemSerializer class

* Data conversion when performing initial import

* Various tweaks

* Fix order of operations for data import

* Rename component

* Allow import/export of more model types

* Fix verbose name

* Import rows as a bulk db operation

* Enable download for PartCategoryTemplateTable

* Update stock item export

* Updates for unit tests

* Remove xls format for now

- Causes some bug in tablib
- Surely xlsx is OK?

* More unit test updates

* Future proof migration

* Updates

* unit tests

* Unit test fix

* Remove 'field_overrides'

- field_defaults will suffice

* Remove 'xls' as download option from frontend

* Add simple unit test for data import

* PUI tweaks

---------

Co-authored-by: Lukas <76838159+wolflu05@users.noreply.github.com>
2024-07-06 18:29:52 +10:00

1335 lines
39 KiB
Python

"""Tasks for automating certain actions and interacting with InvenTree from the CLI."""
import json
import os
import pathlib
import re
import shutil
import subprocess
import sys
from pathlib import Path
from platform import python_version
from invoke import task
def checkPythonVersion():
"""Check that the installed python version meets minimum requirements.
If the python version is not sufficient, exits with a non-zero exit code.
"""
REQ_MAJOR = 3
REQ_MINOR = 9
version = sys.version.split(' ')[0]
valid = True
if sys.version_info.major < REQ_MAJOR:
valid = False
elif sys.version_info.major == REQ_MAJOR and sys.version_info.minor < REQ_MINOR:
valid = False
if not valid:
print(f'The installed python version ({version}) is not supported!')
print(f'InvenTree requires Python {REQ_MAJOR}.{REQ_MINOR} or above')
sys.exit(1)
if __name__ in ['__main__', 'tasks']:
checkPythonVersion()
def apps():
"""Returns a list of installed apps."""
return [
'build',
'common',
'company',
'importer',
'machine',
'order',
'part',
'report',
'stock',
'users',
'plugin',
'InvenTree',
'generic',
'machine',
'web',
]
def content_excludes(
allow_auth: bool = True,
allow_tokens: bool = True,
allow_plugins: bool = True,
allow_sso: bool = True,
):
"""Returns a list of content types to exclude from import/export.
Arguments:
allow_tokens (bool): Allow tokens to be exported/importe
allow_plugins (bool): Allow plugin information to be exported/imported
allow_sso (bool): Allow SSO tokens to be exported/imported
"""
excludes = [
'contenttypes',
'auth.permission',
'error_report.error',
'admin.logentry',
'django_q.schedule',
'django_q.task',
'django_q.ormq',
'exchange.rate',
'exchange.exchangebackend',
'common.notificationentry',
'common.notificationmessage',
'user_sessions.session',
'importer.dataimportsession',
'importer.dataimportcolumnmap',
'importer.dataimportrow',
'report.labeloutput',
'report.reportoutput',
]
# Optionally exclude user auth data
if not allow_auth:
excludes.append('auth.group')
excludes.append('auth.user')
# Optionally exclude user token information
if not allow_tokens:
excludes.append('users.apitoken')
# Optionally exclude plugin information
if not allow_plugins:
excludes.append('plugin.pluginconfig')
excludes.append('plugin.pluginsetting')
# Optionally exclude SSO application information
if not allow_sso:
excludes.append('socialaccount.socialapp')
excludes.append('socialaccount.socialtoken')
return ' '.join([f'--exclude {e}' for e in excludes])
def localDir() -> Path:
"""Returns the directory of *THIS* file.
Used to ensure that the various scripts always run
in the correct directory.
"""
return Path(__file__).parent.resolve()
def managePyDir():
"""Returns the directory of the manage.py file."""
return localDir().joinpath('src', 'backend', 'InvenTree')
def managePyPath():
"""Return the path of the manage.py file."""
return managePyDir().joinpath('manage.py')
def manage(c, cmd, pty: bool = False):
"""Runs a given command against django's "manage.py" script.
Args:
c: Command line context.
cmd: Django command to run.
pty (bool, optional): Run an interactive session. Defaults to False.
"""
c.run(
'cd "{path}" && python3 manage.py {cmd}'.format(path=managePyDir(), cmd=cmd),
pty=pty,
)
def yarn(c, cmd, pty: bool = False):
"""Runs a given command against the yarn package manager.
Args:
c: Command line context.
cmd: Yarn command to run.
pty (bool, optional): Run an interactive session. Defaults to False.
"""
path = localDir().joinpath('src').joinpath('frontend')
c.run(f'cd "{path}" && {cmd}', pty=pty)
def node_available(versions: bool = False, bypass_yarn: bool = False):
"""Checks if the frontend environment (ie node and yarn in bash) is available."""
def ret(val, val0=None, val1=None):
if versions:
return val, val0, val1
return val
def check(cmd):
try:
return str(
subprocess.check_output([cmd], stderr=subprocess.STDOUT, shell=True),
encoding='utf-8',
).strip()
except subprocess.CalledProcessError:
return None
except FileNotFoundError:
return None
yarn_version = check('yarn --version')
node_version = check('node --version')
# Either yarn is available or we don't care about yarn
yarn_passes = bypass_yarn or yarn_version
# Print a warning if node is available but yarn is not
if node_version and not yarn_passes:
print(
'Node is available but yarn is not. Install yarn if you wish to build the frontend.'
)
# Return the result
return ret(yarn_passes and node_version, node_version, yarn_version)
def check_file_existance(filename: str, overwrite: bool = False):
"""Checks if a file exists and asks the user if it should be overwritten.
Args:
filename (str): Name of the file to check.
overwrite (bool, optional): Overwrite the file without asking. Defaults to False.
"""
if Path(filename).is_file() and overwrite is False:
response = input(
'Warning: file already exists. Do you want to overwrite? [y/N]: '
)
response = str(response).strip().lower()
if response not in ['y', 'yes']:
print('Cancelled export operation')
sys.exit(1)
# Install tasks
@task(help={'uv': 'Use UV (experimental package manager)'})
def plugins(c, uv=False):
"""Installs all plugins as specified in 'plugins.txt'."""
from src.backend.InvenTree.InvenTree.config import get_plugin_file
plugin_file = get_plugin_file()
print(f"Installing plugin packages from '{plugin_file}'")
# Install the plugins
if not uv:
c.run(f"pip3 install --disable-pip-version-check -U -r '{plugin_file}'")
else:
c.run('pip3 install --no-cache-dir --disable-pip-version-check uv')
c.run(f"uv pip install -r '{plugin_file}'")
# Collect plugin static files
manage(c, 'collectplugins')
@task(help={'uv': 'Use UV package manager (experimental)'})
def install(c, uv=False):
"""Installs required python packages."""
INSTALL_FILE = 'src/backend/requirements.txt'
print(f"Installing required python packages from '{INSTALL_FILE}'")
if not Path(INSTALL_FILE).is_file():
raise FileNotFoundError(f"Requirements file '{INSTALL_FILE}' not found")
# Install required Python packages with PIP
if not uv:
c.run(
'pip3 install --no-cache-dir --disable-pip-version-check -U pip setuptools'
)
c.run(
f'pip3 install --no-cache-dir --disable-pip-version-check -U --require-hashes -r {INSTALL_FILE}'
)
else:
c.run(
'pip3 install --no-cache-dir --disable-pip-version-check -U uv setuptools'
)
c.run(f'uv pip install -U --require-hashes -r {INSTALL_FILE}')
# Run plugins install
plugins(c, uv=uv)
# Compile license information
lic_path = managePyDir().joinpath('InvenTree', 'licenses.txt')
c.run(
f'pip-licenses --format=json --with-license-file --no-license-path > {lic_path}'
)
@task(help={'tests': 'Set up test dataset at the end'})
def setup_dev(c, tests=False):
"""Sets up everything needed for the dev environment."""
print("Installing required python packages from 'src/backend/requirements-dev.txt'")
# Install required Python packages with PIP
c.run('pip3 install -U --require-hashes -r src/backend/requirements-dev.txt')
# Install pre-commit hook
print('Installing pre-commit for checks before git commits...')
c.run('pre-commit install')
# Update all the hooks
c.run('pre-commit autoupdate')
print('pre-commit set up is done...')
# Set up test-data if flag is set
if tests:
setup_test(c)
# Setup / maintenance tasks
@task
def superuser(c):
"""Create a superuser/admin account for the database."""
manage(c, 'createsuperuser', pty=True)
@task
def rebuild_models(c):
"""Rebuild database models with MPTT structures."""
manage(c, 'rebuild_models', pty=True)
@task
def rebuild_thumbnails(c):
"""Rebuild missing image thumbnails."""
manage(c, 'rebuild_thumbnails', pty=True)
@task
def clean_settings(c):
"""Clean the setting tables of old settings."""
manage(c, 'clean_settings')
@task(help={'mail': "mail of the user who's MFA should be disabled"})
def remove_mfa(c, mail=''):
"""Remove MFA for a user."""
if not mail:
print('You must provide a users mail')
manage(c, f'remove_mfa {mail}')
@task(help={'frontend': 'Build the frontend', 'clear': 'Remove existing static files'})
def static(c, frontend=False, clear=True):
"""Copies required static files to the STATIC_ROOT directory, as per Django requirements."""
manage(c, 'prerender')
if frontend and node_available():
frontend_trans(c)
frontend_build(c)
print('Collecting static files...')
cmd = 'collectstatic --no-input --verbosity 0'
if clear:
cmd += ' --clear'
manage(c, cmd)
# Collect plugin static files
manage(c, 'collectplugins')
@task
def translate_stats(c):
"""Collect translation stats.
The file generated from this is needed for the UI.
"""
# Recompile the translation files (.mo)
# We do not run 'invoke translate' here, as that will touch the source (.po) files too!
try:
manage(c, 'compilemessages', pty=True)
except Exception:
print('WARNING: Translation files could not be compiled:')
path = Path('src', 'backend', 'InvenTree', 'script', 'translation_stats.py')
c.run(f'python3 {path}')
@task(post=[translate_stats])
def translate(c, ignore_static=False, no_frontend=False):
"""Rebuild translation source files. Advanced use only!
Note: This command should not be used on a local install,
it is performed as part of the InvenTree translation toolchain.
"""
# Translate applicable .py / .html / .js files
manage(c, 'makemessages --all -e py,html,js --no-wrap')
manage(c, 'compilemessages')
if not no_frontend and node_available():
frontend_install(c)
frontend_trans(c)
frontend_build(c)
# Update static files
if not ignore_static:
static(c)
@task(
help={
'clean': 'Clean up old backup files',
'path': 'Specify path for generated backup files (leave blank for default path)',
}
)
def backup(c, clean=False, path=None):
"""Backup the database and media files."""
print('Backing up InvenTree database...')
cmd = '--noinput --compress -v 2'
if path:
cmd += f' -O {path}'
if clean:
cmd += ' --clean'
manage(c, f'dbbackup {cmd}')
print('Backing up InvenTree media files...')
manage(c, f'mediabackup {cmd}')
@task(
help={
'path': 'Specify path to locate backup files (leave blank for default path)',
'db_file': 'Specify filename of compressed database archive (leave blank to use most recent backup)',
'media_file': 'Specify filename of compressed media archive (leave blank to use most recent backup)',
'ignore_media': 'Do not import media archive (database restore only)',
'ignore_database': 'Do not import database archive (media restore only)',
}
)
def restore(
c,
path=None,
db_file=None,
media_file=None,
ignore_media=False,
ignore_database=False,
):
"""Restore the database and media files."""
base_cmd = '--noinput --uncompress -v 2'
if path:
base_cmd += f' -I {path}'
if ignore_database:
print('Skipping database archive...')
else:
print('Restoring InvenTree database')
cmd = f'dbrestore {base_cmd}'
if db_file:
cmd += f' -i {db_file}'
manage(c, cmd)
if ignore_media:
print('Skipping media restore...')
else:
print('Restoring InvenTree media files')
cmd = f'mediarestore {base_cmd}'
if media_file:
cmd += f' -i {media_file}'
manage(c, cmd)
@task(post=[rebuild_models, rebuild_thumbnails])
def migrate(c):
"""Performs database migrations.
This is a critical step if the database schema have been altered!
"""
print('Running InvenTree database migrations...')
print('========================================')
# Run custom management command which wraps migrations in "maintenance mode"
manage(c, 'makemigrations')
manage(c, 'runmigrations', pty=True)
manage(c, 'migrate --run-syncdb')
manage(c, 'remove_stale_contenttypes --include-stale-apps --no-input', pty=True)
print('========================================')
print('InvenTree database migrations completed!')
@task(help={'app': 'Specify an app to show migrations for (leave blank for all apps)'})
def showmigrations(c, app=''):
"""Show the migration status of the database."""
manage(c, f'showmigrations {app}', pty=True)
@task(
post=[clean_settings, translate_stats],
help={
'skip_backup': 'Skip database backup step (advanced users)',
'frontend': 'Force frontend compilation/download step (ignores INVENTREE_DOCKER)',
'no_frontend': 'Skip frontend compilation/download step',
'skip_static': 'Skip static file collection step',
'uv': 'Use UV (experimental package manager)',
},
)
def update(
c,
skip_backup: bool = False,
frontend: bool = False,
no_frontend: bool = False,
skip_static: bool = False,
uv: bool = False,
):
"""Update InvenTree installation.
This command should be invoked after source code has been updated,
e.g. downloading new code from GitHub.
The following tasks are performed, in order:
- install
- backup (optional)
- migrate
- frontend_compile or frontend_download (optional)
- static (optional)
- clean_settings
- translate_stats
"""
# Ensure required components are installed
install(c, uv=uv)
if not skip_backup:
backup(c)
# Perform database migrations
migrate(c)
# Stop here if we are not building/downloading the frontend
# If:
# - INVENTREE_DOCKER is set (by the docker image eg.) and not overridden by `--frontend` flag
# - `--no-frontend` flag is set
if (os.environ.get('INVENTREE_DOCKER', False) and not frontend) or no_frontend:
print('Skipping frontend update!')
frontend = False
no_frontend = True
else:
print('Updating frontend...')
# Decide if we should compile the frontend or try to download it
if node_available(bypass_yarn=True):
frontend_compile(c)
else:
frontend_download(c)
if not skip_static:
static(c, frontend=not no_frontend)
# Data tasks
@task(
help={
'filename': "Output filename (default = 'data.json')",
'overwrite': 'Overwrite existing files without asking first (default = False)',
'include_permissions': 'Include user and group permissions in the output file (default = False)',
'include_tokens': 'Include API tokens in the output file (default = False)',
'exclude_plugins': 'Exclude plugin data from the output file (default = False)',
'include_sso': 'Include SSO token data in the output file (default = False)',
'retain_temp': 'Retain temporary files (containing permissions) at end of process (default = False)',
}
)
def export_records(
c,
filename='data.json',
overwrite=False,
include_permissions=False,
include_tokens=False,
exclude_plugins=False,
include_sso=False,
retain_temp=False,
):
"""Export all database records to a file.
Write data to the file defined by filename.
If --overwrite is not set, the user will be prompted about overwriting an existing files.
If --include-permissions is not set, the file defined by filename will have permissions specified for a user or group removed.
If --delete-temp is not set, the temporary file (which includes permissions) will not be deleted. This file is named filename.tmp
For historical reasons, calling this function without any arguments will thus result in two files:
- data.json: does not include permissions
- data.json.tmp: includes permissions
If you want the script to overwrite any existing files without asking, add argument -o / --overwrite.
If you only want one file, add argument - d / --delete-temp.
If you want only one file, with permissions, then additionally add argument -i / --include-permissions
"""
# Get an absolute path to the file
if not os.path.isabs(filename):
filename = localDir().joinpath(filename).resolve()
print(f"Exporting database records to file '{filename}'")
check_file_existance(filename, overwrite)
tmpfile = f'{filename}.tmp'
excludes = content_excludes(
allow_tokens=include_tokens,
allow_plugins=not exclude_plugins,
allow_sso=include_sso,
)
cmd = f"dumpdata --natural-foreign --indent 2 --output '{tmpfile}' {excludes}"
# Dump data to temporary file
manage(c, cmd, pty=True)
print('Running data post-processing step...')
# Post-process the file, to remove any "permissions" specified for a user or group
with open(tmpfile, 'r') as f_in:
data = json.loads(f_in.read())
data_out = []
if include_permissions is False:
for entry in data:
model_name = entry.get('model', None)
# Ignore any temporary settings (start with underscore)
if model_name in ['common.inventreesetting', 'common.inventreeusersetting']:
if entry['fields'].get('key', '').startswith('_'):
continue
if model_name == 'auth.group':
entry['fields']['permissions'] = []
if model_name == 'auth.user':
entry['fields']['user_permissions'] = []
data_out.append(entry)
# Write the processed data to file
with open(filename, 'w') as f_out:
f_out.write(json.dumps(data_out, indent=2))
print('Data export completed')
if not retain_temp:
print('Removing temporary files')
os.remove(tmpfile)
@task(
help={
'filename': 'Input filename',
'clear': 'Clear existing data before import',
'retain_temp': 'Retain temporary files at end of process (default = False)',
},
post=[rebuild_models, rebuild_thumbnails],
)
def import_records(
c, filename='data.json', clear: bool = False, retain_temp: bool = False
):
"""Import database records from a file."""
# Get an absolute path to the supplied filename
if not os.path.isabs(filename):
filename = localDir().joinpath(filename)
if not os.path.exists(filename):
print(f"Error: File '{filename}' does not exist")
sys.exit(1)
if clear:
delete_data(c, force=True)
print(f"Importing database records from '{filename}'")
# We need to load 'auth' data (users / groups) *first*
# This is due to the users.owner model, which has a ContentType foreign key
authfile = f'{filename}.auth.json'
# Pre-process the data, to remove any "permissions" specified for a user or group
datafile = f'{filename}.data.json'
with open(filename, 'r') as f_in:
try:
data = json.loads(f_in.read())
except json.JSONDecodeError as exc:
print(f'Error: Failed to decode JSON file: {exc}')
sys.exit(1)
auth_data = []
load_data = []
for entry in data:
if 'model' in entry:
# Clear out any permissions specified for a group
if entry['model'] == 'auth.group':
entry['fields']['permissions'] = []
# Clear out any permissions specified for a user
if entry['model'] == 'auth.user':
entry['fields']['user_permissions'] = []
# Save auth data for later
if entry['model'].startswith('auth.'):
auth_data.append(entry)
else:
load_data.append(entry)
else:
print('Warning: Invalid entry in data file')
print(entry)
# Write the auth file data
with open(authfile, 'w') as f_out:
f_out.write(json.dumps(auth_data, indent=2))
# Write the processed data to the tmp file
with open(datafile, 'w') as f_out:
f_out.write(json.dumps(load_data, indent=2))
excludes = content_excludes(allow_auth=False)
# Import auth models first
print('Importing user auth data...')
cmd = f"loaddata '{authfile}'"
manage(c, cmd, pty=True)
# Import everything else next
print('Importing database records...')
cmd = f"loaddata '{datafile}' -i {excludes}"
manage(c, cmd, pty=True)
if not retain_temp:
print('Removing temporary files')
os.remove(datafile)
os.remove(authfile)
print('Data import completed')
@task
def delete_data(c, force=False):
"""Delete all database records!
Warning: This will REALLY delete all records in the database!!
"""
print('Deleting all data from InvenTree database...')
if force:
manage(c, 'flush --noinput')
else:
manage(c, 'flush')
@task(post=[rebuild_models, rebuild_thumbnails])
def import_fixtures(c):
"""Import fixture data into the database.
This command imports all existing test fixture data into the database.
Warning:
- Intended for testing / development only!
- Running this command may overwrite existing database data!!
- Don't say you were not warned...
"""
fixtures = [
# Build model
'build',
# Common models
'settings',
# Company model
'company',
'price_breaks',
'supplier_part',
# Order model
'order',
# Part model
'bom',
'category',
'params',
'part',
'test_templates',
# Stock model
'location',
'stock_tests',
'stock',
# Users
'users',
]
command = 'loaddata ' + ' '.join(fixtures)
manage(c, command, pty=True)
# Execution tasks
@task
def wait(c):
"""Wait until the database connection is ready."""
return manage(c, 'wait_for_db')
@task(
pre=[wait],
help={
'address': 'Server address:port (default=0.0.0.0:8000)',
'workers': 'Specify number of worker threads (override config file)',
},
)
def gunicorn(c, address='0.0.0.0:8000', workers=None):
"""Launch a gunicorn webserver.
Note: This server will not auto-reload in response to code changes.
"""
here = os.path.dirname(os.path.abspath(__file__))
config_file = os.path.join(here, 'contrib', 'container', 'gunicorn.conf.py')
chdir = os.path.join(here, 'src', 'backend', 'InvenTree')
cmd = f'gunicorn -c {config_file} InvenTree.wsgi -b {address} --chdir {chdir}'
if workers:
cmd += f' --workers={workers}'
print('Starting Gunicorn Server:')
print(cmd)
c.run(cmd, pty=True)
@task(pre=[wait], help={'address': 'Server address:port (default=127.0.0.1:8000)'})
def server(c, address='127.0.0.1:8000'):
"""Launch a (development) server using Django's in-built webserver.
Note: This is *not* sufficient for a production installation.
"""
manage(c, 'runserver {address}'.format(address=address), pty=True)
@task(pre=[wait])
def worker(c):
"""Run the InvenTree background worker process."""
manage(c, 'qcluster', pty=True)
# Testing tasks
@task
def render_js_files(c):
"""Render templated javascript files (used for static testing)."""
manage(c, 'test InvenTree.ci_render_js')
@task(post=[translate_stats, static, server])
def test_translations(c):
"""Add a fictional language to test if each component is ready for translations."""
import django
from django.conf import settings
# setup django
base_path = Path.cwd()
new_base_path = pathlib.Path('InvenTree').resolve()
sys.path.append(str(new_base_path))
os.chdir(new_base_path)
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'InvenTree.settings')
django.setup()
# Add language
print('Add dummy language...')
print('========================================')
manage(c, 'makemessages -e py,html,js --no-wrap -l xx')
# change translation
print('Fill in dummy translations...')
print('========================================')
file_path = pathlib.Path(settings.LOCALE_PATHS[0], 'xx', 'LC_MESSAGES', 'django.po')
new_file_path = str(file_path) + '_new'
# compile regex
reg = re.compile(
r'[a-zA-Z0-9]{1}' # match any single letter and number # noqa: W504
+ r'(?![^{\(\<]*[}\)\>])' # that is not inside curly brackets, brackets or a tag # noqa: W504
+ r'(?<![^\%][^\(][)][a-z])' # that is not a specially formatted variable with singles # noqa: W504
+ r'(?![^\\][\n])' # that is not a newline
)
last_string = ''
# loop through input file lines
with open(file_path, 'rt') as file_org:
with open(new_file_path, 'wt') as file_new:
for line in file_org:
if line.startswith('msgstr "'):
# write output -> replace regex matches with x in the read in (multi)string
file_new.write(f'msgstr "{reg.sub("x", last_string[7:-2])}"\n')
last_string = '' # reset (multi)string
elif line.startswith('msgid "'):
last_string = (
last_string + line
) # a new translatable string starts -> start append
file_new.write(line)
else:
if last_string:
last_string = (
last_string + line
) # a string is being read in -> continue appending
file_new.write(line)
# change out translation files
file_path.rename(str(file_path) + '_old')
new_file_path.rename(file_path)
# compile languages
print('Compile languages ...')
print('========================================')
manage(c, 'compilemessages')
# reset cwd
os.chdir(base_path)
# set env flag
os.environ['TEST_TRANSLATIONS'] = 'True'
@task(
help={
'disable_pty': 'Disable PTY',
'runtest': 'Specify which tests to run, in format <module>.<file>.<class>.<method>',
'migrations': 'Run migration unit tests',
'report': 'Display a report of slow tests',
'coverage': 'Run code coverage analysis (requires coverage package)',
'cui': 'Do not run CUI tests',
}
)
def test(
c,
disable_pty=False,
runtest='',
migrations=False,
report=False,
coverage=False,
cui=False,
):
"""Run unit-tests for InvenTree codebase.
To run only certain test, use the argument --runtest.
This can filter all the way down to:
<module>.<file>.<class>.<method>
Example:
test --runtest=company.test_api
will run tests in the company/test_api.py file.
"""
# Run sanity check on the django install
manage(c, 'check')
pty = not disable_pty
_apps = ' '.join(apps())
cmd = 'test'
if runtest:
# Specific tests to run
cmd += f' {runtest}'
else:
# Run all tests
cmd += f' {_apps}'
if report:
cmd += ' --slowreport'
if migrations:
cmd += ' --tag migration_test'
else:
cmd += ' --exclude-tag migration_test'
if cui:
cmd += ' --exclude-tag=cui'
if coverage:
# Run tests within coverage environment, and generate report
c.run(f'coverage run {managePyPath()} {cmd}')
c.run('coverage xml -i')
else:
# Run simple test runner, without coverage
manage(c, cmd, pty=pty)
@task(help={'dev': 'Set up development environment at the end'})
def setup_test(c, ignore_update=False, dev=False, path='inventree-demo-dataset'):
"""Setup a testing environment."""
from src.backend.InvenTree.InvenTree.config import get_media_dir
if not ignore_update:
update(c)
# Remove old data directory
if os.path.exists(path):
print('Removing old data ...')
c.run(f'rm {path} -r')
# Get test data
print('Cloning demo dataset ...')
c.run(f'git clone https://github.com/inventree/demo-dataset {path} -v --depth=1')
print('========================================')
# Make sure migrations are done - might have just deleted sqlite database
if not ignore_update:
migrate(c)
# Load data
print('Loading database records ...')
import_records(c, filename=f'{path}/inventree_data.json', clear=True)
# Copy media files
print('Copying media files ...')
src = Path(path).joinpath('media').resolve()
dst = get_media_dir()
print(f'Copying media files - "{src}" to "{dst}"')
shutil.copytree(src, dst, dirs_exist_ok=True)
print('Done setting up test environment...')
print('========================================')
# Set up development setup if flag is set
if dev:
setup_dev(c)
@task(
help={
'filename': "Output filename (default = 'schema.yml')",
'overwrite': 'Overwrite existing files without asking first (default = off/False)',
}
)
def schema(c, filename='schema.yml', overwrite=False, ignore_warnings=False):
"""Export current API schema."""
check_file_existance(filename, overwrite)
filename = os.path.abspath(filename)
print(f"Exporting schema file to '{filename}'")
cmd = f'spectacular --file {filename} --validate --color'
if not ignore_warnings:
cmd += ' --fail-on-warn'
manage(c, cmd, pty=True)
assert os.path.exists(filename)
print('Schema export completed:', filename)
@task(default=True)
def version(c):
"""Show the current version of InvenTree."""
import src.backend.InvenTree.InvenTree.version as InvenTreeVersion
from src.backend.InvenTree.InvenTree.config import (
get_config_file,
get_media_dir,
get_static_dir,
)
# Gather frontend version information
_, node, yarn = node_available(versions=True)
print(
f"""
InvenTree - inventree.org
The Open-Source Inventory Management System\n
Installation paths:
Base {localDir()}
Config {get_config_file()}
Media {get_media_dir()}
Static {get_static_dir()}
Versions:
Python {python_version()}
Django {InvenTreeVersion.inventreeDjangoVersion()}
InvenTree {InvenTreeVersion.inventreeVersion()}
API {InvenTreeVersion.inventreeApiVersion()}
Node {node if node else 'N/A'}
Yarn {yarn if yarn else 'N/A'}
Commit hash: {InvenTreeVersion.inventreeCommitHash()}
Commit date: {InvenTreeVersion.inventreeCommitDate()}"""
)
if len(sys.argv) == 1 and sys.argv[0].startswith('/opt/inventree/env/lib/python'):
print(
"""
You are probably running the package installer / single-line installer. Please mentioned that in any bug reports!
Use '--list' for a list of available commands
Use '--help' for help on a specific command"""
)
@task()
def frontend_check(c):
"""Check if frontend is available."""
print(node_available())
@task
def frontend_compile(c):
"""Generate react frontend.
Args:
c: Context variable
"""
print('Compiling frontend code...')
frontend_install(c)
frontend_trans(c)
frontend_build(c)
@task
def frontend_install(c):
"""Install frontend requirements.
Args:
c: Context variable
"""
print('Installing frontend dependencies')
yarn(c, 'yarn install')
@task
def frontend_trans(c):
"""Compile frontend translations.
Args:
c: Context variable
"""
print('Compiling frontend translations')
yarn(c, 'yarn run extract')
yarn(c, 'yarn run compile')
@task
def frontend_build(c):
"""Build frontend.
Args:
c: Context variable
"""
print('Building frontend')
yarn(c, 'yarn run build --emptyOutDir')
@task
def frontend_dev(c):
"""Start frontend development server.
Args:
c: Context variable
"""
print('Starting frontend development server')
yarn(c, 'yarn run compile')
yarn(c, 'yarn run dev')
@task(
help={
'ref': 'git ref, default: current git ref',
'tag': 'git tag to look for release',
'file': 'destination to frontend-build.zip file',
'repo': 'GitHub repository, default: InvenTree/inventree',
'extract': 'Also extract and place at the correct destination, default: True',
'clean': 'Delete old files from InvenTree/web/static/web first, default: True',
}
)
def frontend_download(
c,
ref=None,
tag=None,
file=None,
repo='InvenTree/inventree',
extract=True,
clean=True,
):
"""Download a pre-build frontend from GitHub if you dont want to install nodejs on your machine.
There are 3 possibilities to install the frontend:
1. invoke frontend-download --ref 01f2aa5f746a36706e9a5e588c4242b7bf1996d5
if ref is omitted, it tries to auto detect the current git ref via `git rev-parse HEAD`.
Note: GitHub doesn't allow workflow artifacts to be downloaded from anonymous users, so
this will output a link where you can download the frontend with a signed in browser
and then continue with option 3
2. invoke frontend-download --tag 0.13.0
Downloads the frontend build from the releases.
3. invoke frontend-download --file /home/vscode/Downloads/frontend-build.zip
This will extract your zip file and place the contents at the correct destination
"""
import functools
import subprocess
from tempfile import NamedTemporaryFile
from zipfile import ZipFile
import requests
print('Downloading frontend...')
# globals
default_headers = {'Accept': 'application/vnd.github.v3+json'}
# helper functions
def find_resource(resource, key, value):
for obj in resource:
if obj[key] == value:
return obj
return None
def handle_extract(file):
# if no extract is requested, exit here
if not extract:
return
dest_path = Path(__file__).parent / 'src/backend' / 'InvenTree/web/static/web'
# if clean, delete static/web directory
if clean:
shutil.rmtree(dest_path, ignore_errors=True)
os.makedirs(dest_path)
print(f'Cleaned directory: {dest_path}')
# unzip build to static folder
with ZipFile(file, 'r') as zip_ref:
zip_ref.extractall(dest_path)
print(f'Unzipped downloaded frontend build to: {dest_path}')
def handle_download(url):
# download frontend-build.zip to temporary file
with requests.get(
url, headers=default_headers, stream=True, allow_redirects=True
) as response, NamedTemporaryFile(suffix='.zip') as dst:
response.raise_for_status()
# auto decode the gzipped raw data
response.raw.read = functools.partial(
response.raw.read, decode_content=True
)
with open(dst.name, 'wb') as f:
shutil.copyfileobj(response.raw, f)
print(f'Downloaded frontend build to temporary file: {dst.name}')
handle_extract(dst.name)
# if zip file is specified, try to extract it directly
if file:
handle_extract(file)
return
# check arguments
if ref is not None and tag is not None:
print('[ERROR] Do not set ref and tag.')
return
if ref is None and tag is None:
try:
ref = subprocess.check_output(
['git', 'rev-parse', 'HEAD'], encoding='utf-8'
).strip()
except Exception:
print("[ERROR] Cannot get current ref via 'git rev-parse HEAD'")
return
if ref is None and tag is None:
print('[ERROR] Either ref or tag needs to be set.')
if tag:
tag = tag.lstrip('v')
try:
handle_download(
f'https://github.com/{repo}/releases/download/{tag}/frontend-build.zip'
)
except Exception as e:
if not isinstance(e, requests.HTTPError):
raise e
print(
f"""[ERROR] An Error occurred. Unable to download frontend build, release or build does not exist,
try downloading the frontend-build.zip yourself via: https://github.com/{repo}/releases
Then try continuing by running: invoke frontend-download --file <path-to-downloaded-zip-file>"""
)
return
if ref:
# get workflow run from all workflow runs on that particular ref
workflow_runs = requests.get(
f'https://api.github.com/repos/{repo}/actions/runs?head_sha={ref}',
headers=default_headers,
).json()
if not (qc_run := find_resource(workflow_runs['workflow_runs'], 'name', 'QC')):
print('[ERROR] Cannot find any workflow runs for current sha')
return
print(
f"Found workflow {qc_run['name']} (run {qc_run['run_number']}-{qc_run['run_attempt']})"
)
# get frontend-build artifact from all artifacts available for this workflow run
artifacts = requests.get(
qc_run['artifacts_url'], headers=default_headers
).json()
if not (
frontend_artifact := find_resource(
artifacts['artifacts'], 'name', 'frontend-build'
)
):
print('[ERROR] Cannot find frontend-build.zip attachment for current sha')
return
print(
f"Found artifact {frontend_artifact['name']} with id {frontend_artifact['id']} ({frontend_artifact['size_in_bytes'] / 1e6:.2f}MB)."
)
print(
f"""
GitHub doesn't allow artifact downloads from anonymous users. Either download the following file
via your signed in browser, or consider using a point release download via invoke frontend-download --tag <git-tag>
Download: https://github.com/{repo}/suites/{qc_run['check_suite_id']}/artifacts/{frontend_artifact['id']} manually and
continue by running: invoke frontend-download --file <path-to-downloaded-zip-file>"""
)
@task(
help={
'address': 'Host and port to run the server on (default: localhost:8080)',
'compile_schema': 'Compile the schema documentation first (default: False)',
}
)
def docs_server(c, address='localhost:8080', compile_schema=False):
"""Start a local mkdocs server to view the documentation."""
if compile_schema:
# Build the schema docs first
schema(c, ignore_warnings=True, overwrite=True, filename='docs/schema.yml')
c.run('python docs/extract_schema.py docs/schema.yml')
c.run(f'mkdocs serve -a {address} -f docs/mkdocs.yml')