SCRIPT FACTORY | BI BLOG SCRIPT FACTORY | BI BLOG
INTELIGENTNA PRZYSZŁOŚĆ ZACZYNA SIĘ TU
SCRIPT FACTORY | BI BLOG SCRIPT FACTORY | BI BLOG
lut 18

Migracja zasobów Azure Databricks cz. 1

  • 18 lutego 2022
  • Wojciech Zduniak
  • Azure, Databricks, Tricks & Tips

Wprowadzenie:

Azure Databricks jest doskonałą platformą do wykonania analiz , czy procesowania znacznej ilości danych tzw. Big Data. Jednak nie jest to jeszcze do końca dojrzała platforma. Przez co często pojawiają się problemy związane np. z procesem migracji danych/klastrów/jobów/kluczy/notebooków itp. do innych środowisk (takich jak Dev/Test/QA). Aby tego dokonać każdy ze wspomnianych komponentów migruje się w inny sposób. Jest jednak narzędzie przygotowane przez Databricks Labs, które ułatwia ten proces.

Pamiętajmy że Databricks jest platformą skupioną na podejściu API First, czyli że najpierw implementowany jest kod używany w API, a następnie dobudowywane są elementy takie jak GUI , klasy w python/cli czy innych językach. Dzięki temu mamy sami możemy zbudować/rozbudować narzędzia migracyjne poszczególnych komponentów znając tylko definicję API.

lista API

Github:

Kod migracji platformy Databricks

Na stronie w readme znajduje się dość rozbudowana dokumentacja jak używać kodu Python. My jednak dodamy coś od siebie, tzn. opakujemy metody export/import w jedną klase do migracji konkretnych zasobów.

Implementacja w Azure Databricks:

Główne pliki wywoływane w oryginalnej implementacji to export_db.py oraz import_db.py. Czyli operacje export/import są niezależne od siebie, co za tym idzie aby wykonać migrację pomiędzy środowiskami, należy wykonać serię skryptów aby dane najpierw wyeksportować a następnie zaimportować. My przygotowaliśmy migrate_db.py który wykonuje odpowiednie operacje w kolejności, aby móc z migrować cały workspace. Opis tego procesu jak również dokładne kody implementacji rozbijemy na kilka artykułów. W pierwszym z nich skupimy się na użytkownikach i klastrach.

Zaczynamy od definicji migrate_db.py

from dbclient import *
from timeit import default_timer as timer
from datetime import timedelta, datetime
from os import makedirs


# python 3.6
def main():
    my_parser = get_migrate_parser()
    args = my_parser.parse_args()

    import_login_args = get_login_credentials(profile=args.import_profile)
    export_login_args = get_login_credentials(profile=args.export_profile)

    if is_azure_creds(import_login_args) and (not args.azure):
        raise ValueError('Login credentials do not match args. Please provide --azure flag for azure envs.')

    if is_azure_creds(export_login_args) and (not args.azure):
        raise ValueError('Login credentials do not match args. Please provide --azure flag for azure envs.')
    import_url = import_login_args['host']
    import_token = import_login_args['token']
    export_url = export_login_args['host']
    export_token = export_login_args['token']
    
    import_client_config = build_client_config(import_url, import_token, args)
    export_client_config = build_client_config(export_url, export_token, args)

    makedirs(export_client_config['export_dir'], exist_ok=True)

    if export_client_config['debug']:
        print(url, token)
    now = str(datetime.now())

    if args.all:
		#kod migracji elementów 

if __name__ == '__main__':
    main()

Kod wywołania:

python migrate_db.py --export_profile <profil źródła> --import_profile <profil celu> --all --bypass-windows-check --azure

Doszły więc dwa nowe parametry

export_profile, import_profile

Parametry –bypass-windows-check i –azure są specyficzne dla środowiska na którym wykonywana jest migracja.

Migracja użytkowników:

Aby z migrować użytkowników przygotowaliśmy poniższy kod w if args.all:

if args.users:
    print("Export all users and groups at {0}".format(now))
    e_scim_c = ScimClient(export_client_config)
    start = timer()
    # log all users
    e_scim_c.log_all_users()
    end = timer()
    print("Complete Users Export Time: " + str(timedelta(seconds=end - start)))
    start = timer()
    # log all groups
    e_scim_c.log_all_groups()
    end = timer()
    print("Complete Group Export Time: " + str(timedelta(seconds=end - start)))
    
    print("Import all users and groups at {0}".format(now))
    i_scim_c = ScimClient(import_client_config)
    start = timer()
    i_scim_c.import_all_users_and_groups()
    end = timer()
    print("Complete Users and Groups Import Time: " + str(timedelta(seconds=end - start)))if args.users:
            print("Export all users and groups at {0}".format(now))
            e_scim_c = ScimClient(export_client_config)
            start = timer()
            # log all users
            e_scim_c.log_all_users()
            end = timer()
            print("Complete Users Export Time: " + str(timedelta(seconds=end - start)))
            start = timer()
            # log all groups
            e_scim_c.log_all_groups()
            end = timer()
            print("Complete Group Export Time: " + str(timedelta(seconds=end - start)))
            
            print("Import all users and groups at {0}".format(now))
            i_scim_c = ScimClient(import_client_config)
            start = timer()
            i_scim_c.import_all_users_and_groups()
            end = timer()
            print("Complete Users and Groups Import Time: " + str(timedelta(seconds=end - start)))

Brak zmian w pozostałych klasach, więc zwykłe połączenie export/import wystarczy w tym przypadku.

Migracja klastrów:

Aby z migrować użytkowników przygotowaliśmy poniższy kod w if args.all:

if args.clusters:
    print("Export the cluster configs at {0}".format(now))
    e_cl_c = ClustersClient(export_client_config)
    start = timer()
    # log the cluster json
    e_cl_c.log_cluster_configs()
    e_cl_c.log_cluster_policies()
    end = timer()
    print("Complete Cluster Export Time: " + str(timedelta(seconds=end - start)))
    # log the instance pools
    print("Start instance pool logging ...")
    start = timer()
    e_cl_c.log_instance_pools()
    end = timer()
    print("Complete Instance Pools Export Time: " + str(timedelta(seconds=end - start)))
                
    print("Import all cluster configs at {0}".format(now))
    i_cl_c = ClustersClient(import_client_config)
    print("Start import of instance pool configurations ...")
    start = timer()
    i_cl_c.import_instance_pools()
    i_cl_c.log_instance_pools(log_file='instance_pools_new.log')
    end = timer()
    print("Complete Instance Pools Creation Time: " + str(timedelta(seconds=end - start)))
    print("Start import of cluster policies ...")
    start = timer()
    i_cl_c.import_cluster_policies(log_pool_file='instance_pools_new.log')
    end = timer()
    print("Complete Cluster Policies Creation Time: " + str(timedelta(seconds=end - start)))
    print("Start import of cluster configurations ...")
    start = timer()
    i_cl_c.import_cluster_configs()
    end = timer()
    print("Complete Cluster Import Time: " + str(timedelta(seconds=end - start)))

W tym miejscu natrafiamy jednak na problem. Jeśli mamy klastry zdefiniowane używając cluster pools lub cluster policies , natrafimy na problem migracji ID tych obiektów do nowego środowiska. Gdyż, w nowym środowisku te elementy dostają nowe unikalne identyfikatory. Aby więc poprawnie przenieść te elementy musimy zachować kolejność tworzenia i przenoszenia obiektów, oraz zbudować mapę migracji ID źródłowych do ID docelowych.

Kroki:

  • eksport konfiguracji cluster, cluster pool, cluster policies
  • import zaczynamy od cluster pools, jednocześnie eksportując utworzone w tym kroku pool-e z nowego docelowego środowiska
  • następnie import cluster policies dodając jako argument nowe cluster pool ustworzone krok więcej
  • następnie import cluster

Modyfikacje import cluster policies:

Poniżej znajduje się kod zmodyfikowany w klasie ClustersClient:

def import_cluster_policies(self, log_file='cluster_policies.log', acl_log_file='acl_cluster_policies.log', log_pool_file='instance_pools.log'):
    policies_log = self.get_export_dir() + log_file
    acl_policies_log = self.get_export_dir() + acl_log_file
    policies_pool_log = self.get_export_dir() + log_pool_file
    pool_map = dict()
    if os.path.exists(policies_pool_log):
        with open(policies_pool_log, 'r') as policy_pool_fp:
            for pp in policy_pool_fp:
                policy_pool_conf = json.loads(pp)
                print(policy_pool_conf['default_tags']['DatabricksInstancePoolId']+':'+policy_pool_conf['custom_tags']['orig_instance_pool_id'])
                pool_map[policy_pool_conf['custom_tags']['orig_instance_pool_id']] = policy_pool_conf['default_tags']['DatabricksInstancePoolId']
    
    # create the policies
    if os.path.exists(policies_log):
        with open(policies_log, 'r') as policy_fp:
            for p in policy_fp:
                policy_conf = json.loads(p)
                # when creating the policy, we only need `name` and `definition` fields
                for ld in pool_map.keys():
                    policy_conf['definition'] = policy_conf['definition'].replace(ld,pool_map[ld])
                
                #print(policy_conf['definition'])
                create_args = {'name': policy_conf['name'],
                               'definition': policy_conf['definition']}
                resp = self.post('/policies/clusters/create', create_args)
        # ACLs are created by using the `access_control_list` key
        with open(acl_policies_log, 'r') as acl_fp:
            id_map = self.get_policy_id_by_name_dict()
            for x in acl_fp:
                p_acl = json.loads(x)
                acl_create_args = {'access_control_list': self.build_acl_args(p_acl['access_control_list'])}
                policy_id = id_map[p_acl['name']]
                api = f'/permissions/cluster-policies/{policy_id}'
                resp = self.put(api, acl_create_args)
                print(resp)
    else:
        print('Skipping cluster policies as no log file exists')

Polityki klastrów zdefiniowane są przy pomocy Json-a a jednym z argumentów jest ID cluster pool, jeśli więc nie istnieje dany cluster pool w nowym środowisku , API zwróci błąd. Musimy go więc przemapować. Znacznie lepszym rozwiązaniem było by tutaj posłużenie się nazwą cluster pool które pomiędzy środowiskami nie ulegnie zmianie.

Podsumowanie:

Za sobą mamy pierwsze modyfikacje kodu Databricks Labs, jednak nie jedyne. Dzięki tym modyfikacjom możemy zmigrować klastry i użytkowników wykonując poniższą komendę.

python migrate_db.py --export_profile <profil źródła> --import_profile <profil celu> --all --users --clusters --bypass-windows-check --azure

Dalsze modyfikacje niedługo.

  • Facebook
  • LinkedIn

About The Author

BI zajmuję się kilkanaście lat, od Microsoft po Teradatę, od on-premise do cloud, od pojedyńczych serwerów po klastrów obliczeniowych. Temat jest tak szeroki i ciekawy że postanowiłem opisać po kolei wszystko co wiem i z czym się zetknąłem. Mam nadzieje że będzie Wam się podobać ta lektura :)

Related Posts

  • Monitoring Azure Databricks9 lutego 2022
  • Undelete na dużej ilości plików4 lutego 2022
  • Jak rozpocząć pracę z Azure Databricks4 lutego 2022
  • Data Warehouse i przetwarzanie plików24 marca 2019

Comments are closed.

Kategorie

  • Apache Hadoop
    • Instalacja
  • Azure
    • Databricks
    • Log Analytics
    • SQL Data Warehouse
    • Storage
    • Synapse
  • DigitalOcena
  • Google
    • Big Query
    • Storage
  • Tricks & Tips

Najnowsze posty:

  • Migracja zasobów Azure Databricks cz. 1
  • Monitoring Azure Databricks
  • Undelete na dużej ilości plików
  • Jak rozpocząć pracę z Azure Databricks
  • Data Warehouse i przetwarzanie plików

Wyszukaj Wpis

© Copyright 2025 SCRIPT FACTORY | BI BLOG