mirror of
https://github.com/inventree/InvenTree
synced 2024-08-30 18:33:04 +00:00
Use more pathlib instead of os (#7589)
* unify path usage * remove usage of `os` for setup_test * more os usage * use mkdir instead
This commit is contained in:
parent
633873365d
commit
1ac6df82e3
70
tasks.py
70
tasks.py
@ -161,7 +161,7 @@ def yarn(c, cmd, pty: bool = False):
|
||||
cmd: Yarn command to run.
|
||||
pty (bool, optional): Run an interactive session. Defaults to False.
|
||||
"""
|
||||
path = localDir().joinpath('src').joinpath('frontend')
|
||||
path = localDir().joinpath('src', 'frontend')
|
||||
c.run(f'cd "{path}" && {cmd}', pty=pty)
|
||||
|
||||
|
||||
@ -200,14 +200,14 @@ def node_available(versions: bool = False, bypass_yarn: bool = False):
|
||||
return ret(yarn_passes and node_version, node_version, yarn_version)
|
||||
|
||||
|
||||
def check_file_existance(filename: str, overwrite: bool = False):
|
||||
def check_file_existance(filename: Path, overwrite: bool = False):
|
||||
"""Checks if a file exists and asks the user if it should be overwritten.
|
||||
|
||||
Args:
|
||||
filename (str): Name of the file to check.
|
||||
overwrite (bool, optional): Overwrite the file without asking. Defaults to False.
|
||||
"""
|
||||
if Path(filename).is_file() and overwrite is False:
|
||||
if filename.is_file() and overwrite is False:
|
||||
response = input(
|
||||
'Warning: file already exists. Do you want to overwrite? [y/N]: '
|
||||
)
|
||||
@ -363,7 +363,7 @@ def translate_stats(c):
|
||||
except Exception:
|
||||
print('WARNING: Translation files could not be compiled:')
|
||||
|
||||
path = Path('src', 'backend', 'InvenTree', 'script', 'translation_stats.py')
|
||||
path = managePyDir().joinpath('script', 'translation_stats.py')
|
||||
c.run(f'python3 {path}')
|
||||
|
||||
|
||||
@ -584,14 +584,15 @@ def export_records(
|
||||
If you want only one file, with permissions, then additionally add argument -i / --include-permissions
|
||||
"""
|
||||
# Get an absolute path to the file
|
||||
if not os.path.isabs(filename):
|
||||
filename = localDir().joinpath(filename).resolve()
|
||||
target = Path(filename)
|
||||
if not target.is_absolute():
|
||||
target = localDir().joinpath(filename).resolve()
|
||||
|
||||
print(f"Exporting database records to file '{filename}'")
|
||||
print(f"Exporting database records to file '{target}'")
|
||||
|
||||
check_file_existance(filename, overwrite)
|
||||
check_file_existance(target, overwrite)
|
||||
|
||||
tmpfile = f'{filename}.tmp'
|
||||
tmpfile = f'{target}.tmp'
|
||||
|
||||
excludes = content_excludes(
|
||||
allow_tokens=include_tokens,
|
||||
@ -630,7 +631,7 @@ def export_records(
|
||||
data_out.append(entry)
|
||||
|
||||
# Write the processed data to file
|
||||
with open(filename, 'w') as f_out:
|
||||
with open(target, 'w') as f_out:
|
||||
f_out.write(json.dumps(data_out, indent=2))
|
||||
|
||||
print('Data export completed')
|
||||
@ -653,26 +654,27 @@ def import_records(
|
||||
):
|
||||
"""Import database records from a file."""
|
||||
# Get an absolute path to the supplied filename
|
||||
if not os.path.isabs(filename):
|
||||
filename = localDir().joinpath(filename)
|
||||
target = Path(filename)
|
||||
if not target.is_absolute():
|
||||
target = localDir().joinpath(filename)
|
||||
|
||||
if not os.path.exists(filename):
|
||||
print(f"Error: File '{filename}' does not exist")
|
||||
if not target.exists():
|
||||
print(f"Error: File '{target}' does not exist")
|
||||
sys.exit(1)
|
||||
|
||||
if clear:
|
||||
delete_data(c, force=True)
|
||||
|
||||
print(f"Importing database records from '{filename}'")
|
||||
print(f"Importing database records from '{target}'")
|
||||
|
||||
# We need to load 'auth' data (users / groups) *first*
|
||||
# This is due to the users.owner model, which has a ContentType foreign key
|
||||
authfile = f'{filename}.auth.json'
|
||||
authfile = f'{target}.auth.json'
|
||||
|
||||
# Pre-process the data, to remove any "permissions" specified for a user or group
|
||||
datafile = f'{filename}.data.json'
|
||||
datafile = f'{target}.data.json'
|
||||
|
||||
with open(filename, 'r') as f_in:
|
||||
with open(target, 'r') as f_in:
|
||||
try:
|
||||
data = json.loads(f_in.read())
|
||||
except json.JSONDecodeError as exc:
|
||||
@ -804,11 +806,10 @@ def gunicorn(c, address='0.0.0.0:8000', workers=None):
|
||||
|
||||
Note: This server will not auto-reload in response to code changes.
|
||||
"""
|
||||
here = os.path.dirname(os.path.abspath(__file__))
|
||||
config_file = os.path.join(here, 'contrib', 'container', 'gunicorn.conf.py')
|
||||
chdir = os.path.join(here, 'src', 'backend', 'InvenTree')
|
||||
|
||||
cmd = f'gunicorn -c {config_file} InvenTree.wsgi -b {address} --chdir {chdir}'
|
||||
config_file = localDir().joinpath('contrib', 'container', 'gunicorn.conf.py')
|
||||
cmd = (
|
||||
f'gunicorn -c {config_file} InvenTree.wsgi -b {address} --chdir {managePyDir()}'
|
||||
)
|
||||
|
||||
if workers:
|
||||
cmd += f' --workers={workers}'
|
||||
@ -985,14 +986,18 @@ def setup_test(c, ignore_update=False, dev=False, path='inventree-demo-dataset')
|
||||
if not ignore_update:
|
||||
update(c)
|
||||
|
||||
template_dir = localDir().joinpath(path)
|
||||
|
||||
# Remove old data directory
|
||||
if os.path.exists(path):
|
||||
if template_dir.exists():
|
||||
print('Removing old data ...')
|
||||
c.run(f'rm {path} -r')
|
||||
c.run(f'rm {template_dir} -r')
|
||||
|
||||
# Get test data
|
||||
print('Cloning demo dataset ...')
|
||||
c.run(f'git clone https://github.com/inventree/demo-dataset {path} -v --depth=1')
|
||||
c.run(
|
||||
f'git clone https://github.com/inventree/demo-dataset {template_dir} -v --depth=1'
|
||||
)
|
||||
print('========================================')
|
||||
|
||||
# Make sure migrations are done - might have just deleted sqlite database
|
||||
@ -1001,11 +1006,11 @@ def setup_test(c, ignore_update=False, dev=False, path='inventree-demo-dataset')
|
||||
|
||||
# Load data
|
||||
print('Loading database records ...')
|
||||
import_records(c, filename=f'{path}/inventree_data.json', clear=True)
|
||||
import_records(c, filename=template_dir.joinpath('inventree_data.json'), clear=True)
|
||||
|
||||
# Copy media files
|
||||
print('Copying media files ...')
|
||||
src = Path(path).joinpath('media').resolve()
|
||||
src = template_dir.joinpath('media')
|
||||
dst = get_media_dir()
|
||||
|
||||
print(f'Copying media files - "{src}" to "{dst}"')
|
||||
@ -1030,10 +1035,9 @@ def schema(
|
||||
c, filename='schema.yml', overwrite=False, ignore_warnings=False, no_default=False
|
||||
):
|
||||
"""Export current API schema."""
|
||||
filename = Path(filename).resolve()
|
||||
check_file_existance(filename, overwrite)
|
||||
|
||||
filename = os.path.abspath(filename)
|
||||
|
||||
print(f"Exporting schema file to '{filename}'")
|
||||
|
||||
cmd = f'spectacular --file {filename} --validate --color'
|
||||
@ -1055,7 +1059,7 @@ def schema(
|
||||
|
||||
manage(c, cmd, pty=True, env=envs)
|
||||
|
||||
assert os.path.exists(filename)
|
||||
assert filename.exists()
|
||||
|
||||
print('Schema export completed:', filename)
|
||||
|
||||
@ -1227,12 +1231,12 @@ def frontend_download(
|
||||
if not extract:
|
||||
return
|
||||
|
||||
dest_path = Path(__file__).parent / 'src/backend' / 'InvenTree/web/static/web'
|
||||
dest_path = managePyDir().joinpath('web', 'static', 'web')
|
||||
|
||||
# if clean, delete static/web directory
|
||||
if clean:
|
||||
shutil.rmtree(dest_path, ignore_errors=True)
|
||||
os.makedirs(dest_path)
|
||||
dest_path.mkdir()
|
||||
print(f'Cleaned directory: {dest_path}')
|
||||
|
||||
# unzip build to static folder
|
||||
|
Loading…
Reference in New Issue
Block a user