# -*- coding: utf-8 -*- import os import sys import importlib import inspect import traceback from argparse import ArgumentTypeError from django.apps import apps from django.conf import settings from django.core.management.base import CommandError from django_extensions.management.email_notifications import EmailNotificationCommand from django_extensions.management.utils import signalcommand class DirPolicyChoices: NONE = 'none' EACH = 'each' ROOT = 'root' def check_is_directory(value): if value is None or not os.path.isdir(value): raise ArgumentTypeError("%s is not a directory!" % value) return value class BadCustomDirectoryException(Exception): def __init__(self, value): self.message = value + ' If --dir-policy is custom than you must set correct directory in ' \ '--dir option or in settings.RUNSCRIPT_CHDIR' def __str__(self): return self.message class Command(EmailNotificationCommand): help = 'Runs a script in django context.' def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.current_directory = os.getcwd() self.last_exit_code = 0 def add_arguments(self, parser): super().add_arguments(parser) parser.add_argument('script', nargs='+') parser.add_argument( '--fixtures', action='store_true', dest='infixtures', default=False, help='Also look in app.fixtures subdir', ) parser.add_argument( '--noscripts', action='store_true', dest='noscripts', default=False, help='Do not look in app.scripts subdir', ) parser.add_argument( '-s', '--silent', action='store_true', dest='silent', default=False, help='Run silently, do not show errors and tracebacks. Also implies --continue-on-error.', ) parser.add_argument( '-c', '--continue-on-error', action='store_true', dest='continue_on_error', default=False, help='Continue executing other scripts even though one has failed. ' 'It will print a traceback unless --no-traceback or --silent are given ' 'The exit code used when terminating will always be 1.', ) parser.add_argument( '--no-traceback', action='store_true', dest='no_traceback', default=False, help='Do not show tracebacks', ) parser.add_argument( '--script-args', nargs='*', type=str, help='Space-separated argument list to be passed to the scripts. Note that the ' 'same arguments will be passed to all named scripts.', ) parser.add_argument( '--dir-policy', type=str, choices=[DirPolicyChoices.NONE, DirPolicyChoices.EACH, DirPolicyChoices.ROOT], help='Policy of selecting scripts execution directory: ' 'none - start all scripts in current directory ' 'each - start all scripts in their directories ' 'root - start all scripts in BASE_DIR directory ', ) parser.add_argument( '--chdir', type=check_is_directory, help='If dir-policy option is set to custom, than this option determines script execution directory.', ) @signalcommand def handle(self, *args, **options): NOTICE = self.style.SQL_TABLE NOTICE2 = self.style.SQL_FIELD ERROR = self.style.ERROR ERROR2 = self.style.NOTICE subdirs = [] scripts = options['script'] if not options['noscripts']: subdirs.append(getattr(settings, 'RUNSCRIPT_SCRIPT_DIR', 'scripts')) if options['infixtures']: subdirs.append('fixtures') verbosity = options["verbosity"] show_traceback = options['traceback'] no_traceback = options['no_traceback'] continue_on_error = options['continue_on_error'] if no_traceback: show_traceback = False else: show_traceback = True silent = options['silent'] if silent: verbosity = 0 continue_on_error = True email_notifications = options['email_notifications'] if len(subdirs) < 1: print(NOTICE("No subdirs to run left.")) return if len(scripts) < 1: print(ERROR("Script name required.")) return def get_directory_from_chdir(): directory = options['chdir'] or getattr(settings, 'RUNSCRIPT_CHDIR', None) try: check_is_directory(directory) except ArgumentTypeError as e: raise BadCustomDirectoryException(str(e)) return directory def get_directory_basing_on_policy(script_module): policy = options['dir_policy'] or getattr(settings, 'RUNSCRIPT_CHDIR_POLICY', DirPolicyChoices.NONE) if policy == DirPolicyChoices.ROOT: return settings.BASE_DIR elif policy == DirPolicyChoices.EACH: return os.path.dirname(inspect.getfile(script_module)) else: return self.current_directory def set_directory(script_module): if options['chdir']: directory = get_directory_from_chdir() elif options['dir_policy']: directory = get_directory_basing_on_policy(script_module) elif getattr(settings, 'RUNSCRIPT_CHDIR', None): directory = get_directory_from_chdir() else: directory = get_directory_basing_on_policy(script_module) os.chdir(os.path.abspath(directory)) def run_script(mod, *script_args): exit_code = None try: set_directory(mod) exit_code = mod.run(*script_args) if isinstance(exit_code, bool): # convert boolean True to exit-code 0 and False to exit-code 1 exit_code = 1 if exit_code else 0 if isinstance(exit_code, int): if exit_code != 0: try: raise CommandError("'%s' failed with exit code %s" % (mod.__name__, exit_code), returncode=exit_code) except TypeError: raise CommandError("'%s' failed with exit code %s" % (mod.__name__, exit_code)) if email_notifications: self.send_email_notification(notification_id=mod.__name__) except Exception as e: if isinstance(e, CommandError) and hasattr(e, 'returncode'): exit_code = e.returncode self.last_exit_code = exit_code if isinstance(exit_code, int) else 1 if silent: return if verbosity > 0: print(ERROR("Exception while running run() in '%s'" % mod.__name__)) if continue_on_error: if show_traceback: traceback.print_exc() return if email_notifications: self.send_email_notification(notification_id=mod.__name__, include_traceback=True) if no_traceback: raise CommandError(repr(e)) raise def my_import(parent_package, module_name): full_module_path = "%s.%s" % (parent_package, module_name) if verbosity > 1: print(NOTICE("Check for %s" % full_module_path)) # Try importing the parent package first try: importlib.import_module(parent_package) except ImportError as e: if str(e).startswith('No module named'): # No need to proceed if the parent package doesn't exist return False try: t = importlib.import_module(full_module_path) except ImportError as e: # The parent package exists, but the module doesn't try: if importlib.util.find_spec(full_module_path) is None: return False except Exception: module_file = os.path.join(settings.BASE_DIR, *full_module_path.split('.')) + '.py' if not os.path.isfile(module_file): return False if silent: return False if show_traceback: traceback.print_exc() if verbosity > 0: print(ERROR("Cannot import module '%s': %s." % (full_module_path, e))) return False if hasattr(t, "run"): if verbosity > 1: print(NOTICE2("Found script '%s' ..." % full_module_path)) return t else: if verbosity > 1: print(ERROR2("Found script '%s' but no run() function found." % full_module_path)) def find_modules_for_script(script): """ Find script module which contains 'run' attribute """ modules = [] # first look in apps for app in apps.get_app_configs(): for subdir in subdirs: mod = my_import("%s.%s" % (app.name, subdir), script) if mod: modules.append(mod) # try direct import if script.find(".") != -1: parent, mod_name = script.rsplit(".", 1) mod = my_import(parent, mod_name) if mod: modules.append(mod) else: # try app.DIR.script import for subdir in subdirs: mod = my_import(subdir, script) if mod: modules.append(mod) return modules if options['script_args']: script_args = options['script_args'] else: script_args = [] # first pass to check if all scripts can be found script_to_run = [] for script in scripts: script_modules = find_modules_for_script(script) if not script_modules: self.last_exit_code = 1 if verbosity > 0 and not silent: print(ERROR("No (valid) module for script '%s' found" % script)) continue script_to_run.extend(script_modules) if self.last_exit_code: if verbosity < 2 and not silent: print(ERROR("Try running with a higher verbosity level like: -v2 or -v3")) if not continue_on_error: script_to_run = [] for script_mod in script_to_run: if verbosity > 1: print(NOTICE2("Running script '%s' ..." % script_mod.__name__)) run_script(script_mod, *script_args) if self.last_exit_code != 0: if silent: if hasattr(self, 'running_tests'): return sys.exit(self.last_exit_code) try: raise CommandError("An error has occurred running scripts. See errors above.", returncode=self.last_exit_code) except TypeError: # Django < 3.1 fallback if self.last_exit_code == 1: # if exit_code is 1 we can still raise CommandError without returncode argument raise CommandError("An error has occurred running scripts. See errors above.") print(ERROR("An error has occurred running scripts. See errors above.")) if hasattr(self, 'running_tests'): return sys.exit(self.last_exit_code)