mirror of
https://github.com/welton89/RRBEC.git
synced 2026-04-06 05:55:42 +00:00
nem sei pq tantos arquivos
This commit is contained in:
@@ -0,0 +1,299 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
import os
|
||||
import sys
|
||||
import importlib
|
||||
import inspect
|
||||
import traceback
|
||||
|
||||
from argparse import ArgumentTypeError
|
||||
|
||||
from django.apps import apps
|
||||
from django.conf import settings
|
||||
from django.core.management.base import CommandError
|
||||
|
||||
from django_extensions.management.email_notifications import EmailNotificationCommand
|
||||
from django_extensions.management.utils import signalcommand
|
||||
|
||||
|
||||
class DirPolicyChoices:
|
||||
NONE = 'none'
|
||||
EACH = 'each'
|
||||
ROOT = 'root'
|
||||
|
||||
|
||||
def check_is_directory(value):
|
||||
if value is None or not os.path.isdir(value):
|
||||
raise ArgumentTypeError("%s is not a directory!" % value)
|
||||
return value
|
||||
|
||||
|
||||
class BadCustomDirectoryException(Exception):
|
||||
def __init__(self, value):
|
||||
self.message = value + ' If --dir-policy is custom than you must set correct directory in ' \
|
||||
'--dir option or in settings.RUNSCRIPT_CHDIR'
|
||||
|
||||
def __str__(self):
|
||||
return self.message
|
||||
|
||||
|
||||
class Command(EmailNotificationCommand):
|
||||
help = 'Runs a script in django context.'
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.current_directory = os.getcwd()
|
||||
self.last_exit_code = 0
|
||||
|
||||
def add_arguments(self, parser):
|
||||
super().add_arguments(parser)
|
||||
parser.add_argument('script', nargs='+')
|
||||
parser.add_argument(
|
||||
'--fixtures', action='store_true', dest='infixtures', default=False,
|
||||
help='Also look in app.fixtures subdir',
|
||||
)
|
||||
parser.add_argument(
|
||||
'--noscripts', action='store_true', dest='noscripts', default=False,
|
||||
help='Do not look in app.scripts subdir',
|
||||
)
|
||||
parser.add_argument(
|
||||
'-s', '--silent', action='store_true', dest='silent', default=False,
|
||||
help='Run silently, do not show errors and tracebacks. Also implies --continue-on-error.',
|
||||
)
|
||||
parser.add_argument(
|
||||
'-c', '--continue-on-error', action='store_true', dest='continue_on_error', default=False,
|
||||
help='Continue executing other scripts even though one has failed. '
|
||||
'It will print a traceback unless --no-traceback or --silent are given '
|
||||
'The exit code used when terminating will always be 1.',
|
||||
)
|
||||
parser.add_argument(
|
||||
'--no-traceback', action='store_true', dest='no_traceback', default=False,
|
||||
help='Do not show tracebacks',
|
||||
)
|
||||
parser.add_argument(
|
||||
'--script-args', nargs='*', type=str,
|
||||
help='Space-separated argument list to be passed to the scripts. Note that the '
|
||||
'same arguments will be passed to all named scripts.',
|
||||
)
|
||||
parser.add_argument(
|
||||
'--dir-policy', type=str,
|
||||
choices=[DirPolicyChoices.NONE, DirPolicyChoices.EACH, DirPolicyChoices.ROOT],
|
||||
help='Policy of selecting scripts execution directory: '
|
||||
'none - start all scripts in current directory '
|
||||
'each - start all scripts in their directories '
|
||||
'root - start all scripts in BASE_DIR directory ',
|
||||
)
|
||||
parser.add_argument(
|
||||
'--chdir', type=check_is_directory,
|
||||
help='If dir-policy option is set to custom, than this option determines script execution directory.',
|
||||
)
|
||||
|
||||
@signalcommand
|
||||
def handle(self, *args, **options):
|
||||
|
||||
NOTICE = self.style.SQL_TABLE
|
||||
NOTICE2 = self.style.SQL_FIELD
|
||||
ERROR = self.style.ERROR
|
||||
ERROR2 = self.style.NOTICE
|
||||
|
||||
subdirs = []
|
||||
scripts = options['script']
|
||||
|
||||
if not options['noscripts']:
|
||||
subdirs.append(getattr(settings, 'RUNSCRIPT_SCRIPT_DIR', 'scripts'))
|
||||
if options['infixtures']:
|
||||
subdirs.append('fixtures')
|
||||
verbosity = options["verbosity"]
|
||||
show_traceback = options['traceback']
|
||||
no_traceback = options['no_traceback']
|
||||
continue_on_error = options['continue_on_error']
|
||||
if no_traceback:
|
||||
show_traceback = False
|
||||
else:
|
||||
show_traceback = True
|
||||
silent = options['silent']
|
||||
if silent:
|
||||
verbosity = 0
|
||||
continue_on_error = True
|
||||
email_notifications = options['email_notifications']
|
||||
|
||||
if len(subdirs) < 1:
|
||||
print(NOTICE("No subdirs to run left."))
|
||||
return
|
||||
|
||||
if len(scripts) < 1:
|
||||
print(ERROR("Script name required."))
|
||||
return
|
||||
|
||||
def get_directory_from_chdir():
|
||||
directory = options['chdir'] or getattr(settings, 'RUNSCRIPT_CHDIR', None)
|
||||
try:
|
||||
check_is_directory(directory)
|
||||
except ArgumentTypeError as e:
|
||||
raise BadCustomDirectoryException(str(e))
|
||||
return directory
|
||||
|
||||
def get_directory_basing_on_policy(script_module):
|
||||
policy = options['dir_policy'] or getattr(settings, 'RUNSCRIPT_CHDIR_POLICY', DirPolicyChoices.NONE)
|
||||
if policy == DirPolicyChoices.ROOT:
|
||||
return settings.BASE_DIR
|
||||
elif policy == DirPolicyChoices.EACH:
|
||||
return os.path.dirname(inspect.getfile(script_module))
|
||||
else:
|
||||
return self.current_directory
|
||||
|
||||
def set_directory(script_module):
|
||||
if options['chdir']:
|
||||
directory = get_directory_from_chdir()
|
||||
elif options['dir_policy']:
|
||||
directory = get_directory_basing_on_policy(script_module)
|
||||
elif getattr(settings, 'RUNSCRIPT_CHDIR', None):
|
||||
directory = get_directory_from_chdir()
|
||||
else:
|
||||
directory = get_directory_basing_on_policy(script_module)
|
||||
os.chdir(os.path.abspath(directory))
|
||||
|
||||
def run_script(mod, *script_args):
|
||||
exit_code = None
|
||||
try:
|
||||
set_directory(mod)
|
||||
exit_code = mod.run(*script_args)
|
||||
if isinstance(exit_code, bool):
|
||||
# convert boolean True to exit-code 0 and False to exit-code 1
|
||||
exit_code = 1 if exit_code else 0
|
||||
if isinstance(exit_code, int):
|
||||
if exit_code != 0:
|
||||
try:
|
||||
raise CommandError("'%s' failed with exit code %s" % (mod.__name__, exit_code), returncode=exit_code)
|
||||
except TypeError:
|
||||
raise CommandError("'%s' failed with exit code %s" % (mod.__name__, exit_code))
|
||||
if email_notifications:
|
||||
self.send_email_notification(notification_id=mod.__name__)
|
||||
except Exception as e:
|
||||
if isinstance(e, CommandError) and hasattr(e, 'returncode'):
|
||||
exit_code = e.returncode
|
||||
self.last_exit_code = exit_code if isinstance(exit_code, int) else 1
|
||||
if silent:
|
||||
return
|
||||
if verbosity > 0:
|
||||
print(ERROR("Exception while running run() in '%s'" % mod.__name__))
|
||||
if continue_on_error:
|
||||
if show_traceback:
|
||||
traceback.print_exc()
|
||||
return
|
||||
if email_notifications:
|
||||
self.send_email_notification(notification_id=mod.__name__, include_traceback=True)
|
||||
|
||||
if no_traceback:
|
||||
raise CommandError(repr(e))
|
||||
|
||||
raise
|
||||
|
||||
def my_import(parent_package, module_name):
|
||||
full_module_path = "%s.%s" % (parent_package, module_name)
|
||||
if verbosity > 1:
|
||||
print(NOTICE("Check for %s" % full_module_path))
|
||||
# Try importing the parent package first
|
||||
try:
|
||||
importlib.import_module(parent_package)
|
||||
except ImportError as e:
|
||||
if str(e).startswith('No module named'):
|
||||
# No need to proceed if the parent package doesn't exist
|
||||
return False
|
||||
|
||||
try:
|
||||
t = importlib.import_module(full_module_path)
|
||||
except ImportError as e:
|
||||
# The parent package exists, but the module doesn't
|
||||
try:
|
||||
if importlib.util.find_spec(full_module_path) is None:
|
||||
return False
|
||||
except Exception:
|
||||
module_file = os.path.join(settings.BASE_DIR, *full_module_path.split('.')) + '.py'
|
||||
if not os.path.isfile(module_file):
|
||||
return False
|
||||
|
||||
if silent:
|
||||
return False
|
||||
if show_traceback:
|
||||
traceback.print_exc()
|
||||
if verbosity > 0:
|
||||
print(ERROR("Cannot import module '%s': %s." % (full_module_path, e)))
|
||||
|
||||
return False
|
||||
|
||||
if hasattr(t, "run"):
|
||||
if verbosity > 1:
|
||||
print(NOTICE2("Found script '%s' ..." % full_module_path))
|
||||
return t
|
||||
else:
|
||||
if verbosity > 1:
|
||||
print(ERROR2("Found script '%s' but no run() function found." % full_module_path))
|
||||
|
||||
def find_modules_for_script(script):
|
||||
""" Find script module which contains 'run' attribute """
|
||||
modules = []
|
||||
# first look in apps
|
||||
for app in apps.get_app_configs():
|
||||
for subdir in subdirs:
|
||||
mod = my_import("%s.%s" % (app.name, subdir), script)
|
||||
if mod:
|
||||
modules.append(mod)
|
||||
# try direct import
|
||||
if script.find(".") != -1:
|
||||
parent, mod_name = script.rsplit(".", 1)
|
||||
mod = my_import(parent, mod_name)
|
||||
if mod:
|
||||
modules.append(mod)
|
||||
else:
|
||||
# try app.DIR.script import
|
||||
for subdir in subdirs:
|
||||
mod = my_import(subdir, script)
|
||||
if mod:
|
||||
modules.append(mod)
|
||||
|
||||
return modules
|
||||
|
||||
if options['script_args']:
|
||||
script_args = options['script_args']
|
||||
else:
|
||||
script_args = []
|
||||
|
||||
# first pass to check if all scripts can be found
|
||||
script_to_run = []
|
||||
for script in scripts:
|
||||
script_modules = find_modules_for_script(script)
|
||||
if not script_modules:
|
||||
self.last_exit_code = 1
|
||||
if verbosity > 0 and not silent:
|
||||
print(ERROR("No (valid) module for script '%s' found" % script))
|
||||
continue
|
||||
script_to_run.extend(script_modules)
|
||||
|
||||
if self.last_exit_code:
|
||||
if verbosity < 2 and not silent:
|
||||
print(ERROR("Try running with a higher verbosity level like: -v2 or -v3"))
|
||||
if not continue_on_error:
|
||||
script_to_run = []
|
||||
|
||||
for script_mod in script_to_run:
|
||||
if verbosity > 1:
|
||||
print(NOTICE2("Running script '%s' ..." % script_mod.__name__))
|
||||
run_script(script_mod, *script_args)
|
||||
|
||||
if self.last_exit_code != 0:
|
||||
if silent:
|
||||
if hasattr(self, 'running_tests'):
|
||||
return
|
||||
sys.exit(self.last_exit_code)
|
||||
|
||||
try:
|
||||
raise CommandError("An error has occurred running scripts. See errors above.", returncode=self.last_exit_code)
|
||||
except TypeError:
|
||||
# Django < 3.1 fallback
|
||||
if self.last_exit_code == 1:
|
||||
# if exit_code is 1 we can still raise CommandError without returncode argument
|
||||
raise CommandError("An error has occurred running scripts. See errors above.")
|
||||
print(ERROR("An error has occurred running scripts. See errors above."))
|
||||
if hasattr(self, 'running_tests'):
|
||||
return
|
||||
sys.exit(self.last_exit_code)
|
||||
Reference in New Issue
Block a user