mirror of
https://github.com/welton89/RRBEC.git
synced 2026-04-06 14:04:12 +00:00
175 lines
6.6 KiB
Python
175 lines
6.6 KiB
Python
# -*- coding: utf-8 -*-
|
|
import json
|
|
from operator import itemgetter
|
|
from pathlib import Path
|
|
|
|
from django.core.management import call_command
|
|
from django.core.management.base import BaseCommand, CommandError
|
|
from django.db import DEFAULT_DB_ALIAS, connections
|
|
from django.db.migrations.loader import MigrationLoader
|
|
from django.db.migrations.recorder import MigrationRecorder
|
|
from django.utils import timezone
|
|
|
|
from django_extensions.management.utils import signalcommand
|
|
|
|
DEFAULT_FILENAME = 'managestate.json'
|
|
DEFAULT_STATE = 'default'
|
|
|
|
|
|
class Command(BaseCommand):
|
|
help = 'Manage database state in the convenient way.'
|
|
conn = database = None
|
|
_applied_migrations = None
|
|
migrate_args: dict
|
|
migrate_options: dict
|
|
filename: str
|
|
verbosity: int
|
|
|
|
def add_arguments(self, parser):
|
|
parser.add_argument(
|
|
'action', choices=('dump', 'load'),
|
|
help='An action to do. '
|
|
'Dump action saves applied migrations to a file. '
|
|
'Load action applies migrations specified in a file.',
|
|
)
|
|
parser.add_argument(
|
|
'state', nargs='?', default=DEFAULT_STATE,
|
|
help=f'A name of a state. Usually a name of a git branch. Defaults to "{DEFAULT_STATE}"',
|
|
)
|
|
parser.add_argument(
|
|
'-d', '--database', default=DEFAULT_DB_ALIAS,
|
|
help=f'Nominates a database to synchronize. Defaults to the "{DEFAULT_DB_ALIAS}" database.',
|
|
)
|
|
parser.add_argument(
|
|
'-f', '--filename', default=DEFAULT_FILENAME,
|
|
help=f'A file to write to. Defaults to "{DEFAULT_FILENAME}"',
|
|
)
|
|
|
|
# migrate command arguments
|
|
parser.add_argument(
|
|
'--noinput', '--no-input', action='store_false', dest='interactive',
|
|
help='The argument for "migrate" command. '
|
|
'Tells Django to NOT prompt the user for input of any kind.',
|
|
)
|
|
parser.add_argument(
|
|
'--fake', action='store_true',
|
|
help='The argument for "migrate" command. '
|
|
'Mark migrations as run without actually running them.',
|
|
)
|
|
parser.add_argument(
|
|
'--fake-initial', action='store_true',
|
|
help='The argument for "migrate" command. '
|
|
'Detect if tables already exist and fake-apply initial migrations if so. Make sure '
|
|
'that the current database schema matches your initial migration before using this '
|
|
'flag. Django will only check for an existing table name.',
|
|
)
|
|
parser.add_argument(
|
|
'--plan', action='store_true',
|
|
help='The argument for "migrate" command. '
|
|
'Shows a list of the migration actions that will be performed.',
|
|
)
|
|
parser.add_argument(
|
|
'--run-syncdb', action='store_true',
|
|
help='The argument for "migrate" command. '
|
|
'Creates tables for apps without migrations.',
|
|
)
|
|
parser.add_argument(
|
|
'--check', action='store_true', dest='check_unapplied',
|
|
help='The argument for "migrate" command. '
|
|
'Exits with a non-zero status if unapplied migrations exist.',
|
|
)
|
|
|
|
@signalcommand
|
|
def handle(self, action, database, filename, state, *args, **options):
|
|
self.migrate_args = args
|
|
self.migrate_options = options
|
|
self.verbosity = options['verbosity']
|
|
self.conn = connections[database]
|
|
self.database = database
|
|
self.filename = filename
|
|
getattr(self, action)(state)
|
|
|
|
def dump(self, state: str):
|
|
"""Save applied migrations to a file."""
|
|
migrated_apps = self.get_migrated_apps()
|
|
migrated_apps.update(self.get_applied_migrations())
|
|
self.write({state: migrated_apps})
|
|
self.stdout.write(self.style.SUCCESS(
|
|
f'Migrations for state "{state}" have been successfully saved to {self.filename}.'
|
|
))
|
|
|
|
def load(self, state: str):
|
|
"""Apply migrations from a file."""
|
|
migrations = self.read().get(state)
|
|
if migrations is None:
|
|
raise CommandError(f'No such state saved: {state}')
|
|
|
|
kwargs = {
|
|
**self.migrate_options,
|
|
'database': self.database,
|
|
'verbosity': self.verbosity - 1 if self.verbosity > 1 else 0
|
|
}
|
|
|
|
for app, migration in migrations.items():
|
|
if self.is_applied(app, migration):
|
|
continue
|
|
|
|
if self.verbosity > 1:
|
|
self.stdout.write(self.style.WARNING(f'Applying migrations for "{app}"'))
|
|
args = (app, migration, *self.migrate_args)
|
|
call_command('migrate', *args, **kwargs)
|
|
|
|
self.stdout.write(self.style.SUCCESS(
|
|
f'Migrations for "{state}" have been successfully applied.'
|
|
))
|
|
|
|
def get_migrated_apps(self) -> dict:
|
|
"""Installed apps having migrations."""
|
|
apps = MigrationLoader(self.conn).migrated_apps
|
|
migrated_apps = dict.fromkeys(apps, 'zero')
|
|
if self.verbosity > 1:
|
|
self.stdout.write('Apps having migrations: ' + ', '.join(sorted(migrated_apps)))
|
|
return migrated_apps
|
|
|
|
def get_applied_migrations(self) -> dict:
|
|
"""Installed apps with last applied migrations."""
|
|
if self._applied_migrations:
|
|
return self._applied_migrations
|
|
|
|
migrations = MigrationRecorder(self.conn).applied_migrations()
|
|
last_applied = sorted(migrations.keys(), key=itemgetter(1))
|
|
|
|
self._applied_migrations = dict(last_applied)
|
|
return self._applied_migrations
|
|
|
|
def is_applied(self, app: str, migration: str) -> bool:
|
|
"""Check whether a migration for an app is applied or not."""
|
|
applied = self.get_applied_migrations().get(app)
|
|
if applied == migration:
|
|
if self.verbosity > 1:
|
|
self.stdout.write(self.style.WARNING(
|
|
f'Migrations for "{app}" are already applied.'
|
|
))
|
|
return True
|
|
return False
|
|
|
|
def read(self) -> dict:
|
|
"""Get saved state from the file."""
|
|
path = Path(self.filename)
|
|
if not path.exists() or not path.is_file():
|
|
raise CommandError(f'No such file: {self.filename}')
|
|
|
|
with open(self.filename) as file:
|
|
return json.load(file)
|
|
|
|
def write(self, data: dict):
|
|
"""Write new data to the file using existent one."""
|
|
try:
|
|
saved = self.read()
|
|
except CommandError:
|
|
saved = {}
|
|
|
|
saved.update(data, updated_at=str(timezone.now()))
|
|
with open(self.filename, 'w') as file:
|
|
json.dump(saved, file, indent=2, sort_keys=True)
|