Wprowadzenie:
Azure Databricks jest doskonałą platformą do wykonania analiz , czy procesowania znacznej ilości danych tzw. Big Data. Jednak nie jest to jeszcze do końca dojrzała platforma. Przez co często pojawiają się problemy związane np. z procesem migracji danych/klastrów/jobów/kluczy/notebooków itp. do innych środowisk (takich jak Dev/Test/QA). Aby tego dokonać każdy ze wspomnianych komponentów migruje się w inny sposób. Jest jednak narzędzie przygotowane przez Databricks Labs, które ułatwia ten proces.
Pamiętajmy że Databricks jest platformą skupioną na podejściu API First, czyli że najpierw implementowany jest kod używany w API, a następnie dobudowywane są elementy takie jak GUI , klasy w python/cli czy innych językach. Dzięki temu mamy sami możemy zbudować/rozbudować narzędzia migracyjne poszczególnych komponentów znając tylko definicję API.
Github:
Kod migracji platformy Databricks
Na stronie w readme znajduje się dość rozbudowana dokumentacja jak używać kodu Python. My jednak dodamy coś od siebie, tzn. opakujemy metody export/import w jedną klase do migracji konkretnych zasobów.
Implementacja w Azure Databricks:
Główne pliki wywoływane w oryginalnej implementacji to export_db.py oraz import_db.py. Czyli operacje export/import są niezależne od siebie, co za tym idzie aby wykonać migrację pomiędzy środowiskami, należy wykonać serię skryptów aby dane najpierw wyeksportować a następnie zaimportować. My przygotowaliśmy migrate_db.py który wykonuje odpowiednie operacje w kolejności, aby móc z migrować cały workspace. Opis tego procesu jak również dokładne kody implementacji rozbijemy na kilka artykułów. W pierwszym z nich skupimy się na użytkownikach i klastrach.
Zaczynamy od definicji migrate_db.py
from dbclient import *
from timeit import default_timer as timer
from datetime import timedelta, datetime
from os import makedirs
# python 3.6
def main():
my_parser = get_migrate_parser()
args = my_parser.parse_args()
import_login_args = get_login_credentials(profile=args.import_profile)
export_login_args = get_login_credentials(profile=args.export_profile)
if is_azure_creds(import_login_args) and (not args.azure):
raise ValueError('Login credentials do not match args. Please provide --azure flag for azure envs.')
if is_azure_creds(export_login_args) and (not args.azure):
raise ValueError('Login credentials do not match args. Please provide --azure flag for azure envs.')
import_url = import_login_args['host']
import_token = import_login_args['token']
export_url = export_login_args['host']
export_token = export_login_args['token']
import_client_config = build_client_config(import_url, import_token, args)
export_client_config = build_client_config(export_url, export_token, args)
makedirs(export_client_config['export_dir'], exist_ok=True)
if export_client_config['debug']:
print(url, token)
now = str(datetime.now())
if args.all:
#kod migracji elementów
if __name__ == '__main__':
main()
Kod wywołania:
python migrate_db.py --export_profile <profil źródła> --import_profile <profil celu> --all --bypass-windows-check --azure
Doszły więc dwa nowe parametry
export_profile, import_profile
Parametry –bypass-windows-check i –azure są specyficzne dla środowiska na którym wykonywana jest migracja.
Migracja użytkowników:
Aby z migrować użytkowników przygotowaliśmy poniższy kod w if args.all:
if args.users:
print("Export all users and groups at {0}".format(now))
e_scim_c = ScimClient(export_client_config)
start = timer()
# log all users
e_scim_c.log_all_users()
end = timer()
print("Complete Users Export Time: " + str(timedelta(seconds=end - start)))
start = timer()
# log all groups
e_scim_c.log_all_groups()
end = timer()
print("Complete Group Export Time: " + str(timedelta(seconds=end - start)))
print("Import all users and groups at {0}".format(now))
i_scim_c = ScimClient(import_client_config)
start = timer()
i_scim_c.import_all_users_and_groups()
end = timer()
print("Complete Users and Groups Import Time: " + str(timedelta(seconds=end - start)))if args.users:
print("Export all users and groups at {0}".format(now))
e_scim_c = ScimClient(export_client_config)
start = timer()
# log all users
e_scim_c.log_all_users()
end = timer()
print("Complete Users Export Time: " + str(timedelta(seconds=end - start)))
start = timer()
# log all groups
e_scim_c.log_all_groups()
end = timer()
print("Complete Group Export Time: " + str(timedelta(seconds=end - start)))
print("Import all users and groups at {0}".format(now))
i_scim_c = ScimClient(import_client_config)
start = timer()
i_scim_c.import_all_users_and_groups()
end = timer()
print("Complete Users and Groups Import Time: " + str(timedelta(seconds=end - start)))
Brak zmian w pozostałych klasach, więc zwykłe połączenie export/import wystarczy w tym przypadku.
Migracja klastrów:
Aby z migrować użytkowników przygotowaliśmy poniższy kod w if args.all:
if args.clusters:
print("Export the cluster configs at {0}".format(now))
e_cl_c = ClustersClient(export_client_config)
start = timer()
# log the cluster json
e_cl_c.log_cluster_configs()
e_cl_c.log_cluster_policies()
end = timer()
print("Complete Cluster Export Time: " + str(timedelta(seconds=end - start)))
# log the instance pools
print("Start instance pool logging ...")
start = timer()
e_cl_c.log_instance_pools()
end = timer()
print("Complete Instance Pools Export Time: " + str(timedelta(seconds=end - start)))
print("Import all cluster configs at {0}".format(now))
i_cl_c = ClustersClient(import_client_config)
print("Start import of instance pool configurations ...")
start = timer()
i_cl_c.import_instance_pools()
i_cl_c.log_instance_pools(log_file='instance_pools_new.log')
end = timer()
print("Complete Instance Pools Creation Time: " + str(timedelta(seconds=end - start)))
print("Start import of cluster policies ...")
start = timer()
i_cl_c.import_cluster_policies(log_pool_file='instance_pools_new.log')
end = timer()
print("Complete Cluster Policies Creation Time: " + str(timedelta(seconds=end - start)))
print("Start import of cluster configurations ...")
start = timer()
i_cl_c.import_cluster_configs()
end = timer()
print("Complete Cluster Import Time: " + str(timedelta(seconds=end - start)))
W tym miejscu natrafiamy jednak na problem. Jeśli mamy klastry zdefiniowane używając cluster pools lub cluster policies , natrafimy na problem migracji ID tych obiektów do nowego środowiska. Gdyż, w nowym środowisku te elementy dostają nowe unikalne identyfikatory. Aby więc poprawnie przenieść te elementy musimy zachować kolejność tworzenia i przenoszenia obiektów, oraz zbudować mapę migracji ID źródłowych do ID docelowych.
Kroki:
- eksport konfiguracji cluster, cluster pool, cluster policies
- import zaczynamy od cluster pools, jednocześnie eksportując utworzone w tym kroku pool-e z nowego docelowego środowiska
- następnie import cluster policies dodając jako argument nowe cluster pool ustworzone krok więcej
- następnie import cluster
Modyfikacje import cluster policies:
Poniżej znajduje się kod zmodyfikowany w klasie ClustersClient:
def import_cluster_policies(self, log_file='cluster_policies.log', acl_log_file='acl_cluster_policies.log', log_pool_file='instance_pools.log'):
policies_log = self.get_export_dir() + log_file
acl_policies_log = self.get_export_dir() + acl_log_file
policies_pool_log = self.get_export_dir() + log_pool_file
pool_map = dict()
if os.path.exists(policies_pool_log):
with open(policies_pool_log, 'r') as policy_pool_fp:
for pp in policy_pool_fp:
policy_pool_conf = json.loads(pp)
print(policy_pool_conf['default_tags']['DatabricksInstancePoolId']+':'+policy_pool_conf['custom_tags']['orig_instance_pool_id'])
pool_map[policy_pool_conf['custom_tags']['orig_instance_pool_id']] = policy_pool_conf['default_tags']['DatabricksInstancePoolId']
# create the policies
if os.path.exists(policies_log):
with open(policies_log, 'r') as policy_fp:
for p in policy_fp:
policy_conf = json.loads(p)
# when creating the policy, we only need `name` and `definition` fields
for ld in pool_map.keys():
policy_conf['definition'] = policy_conf['definition'].replace(ld,pool_map[ld])
#print(policy_conf['definition'])
create_args = {'name': policy_conf['name'],
'definition': policy_conf['definition']}
resp = self.post('/policies/clusters/create', create_args)
# ACLs are created by using the `access_control_list` key
with open(acl_policies_log, 'r') as acl_fp:
id_map = self.get_policy_id_by_name_dict()
for x in acl_fp:
p_acl = json.loads(x)
acl_create_args = {'access_control_list': self.build_acl_args(p_acl['access_control_list'])}
policy_id = id_map[p_acl['name']]
api = f'/permissions/cluster-policies/{policy_id}'
resp = self.put(api, acl_create_args)
print(resp)
else:
print('Skipping cluster policies as no log file exists')
Polityki klastrów zdefiniowane są przy pomocy Json-a a jednym z argumentów jest ID cluster pool, jeśli więc nie istnieje dany cluster pool w nowym środowisku , API zwróci błąd. Musimy go więc przemapować. Znacznie lepszym rozwiązaniem było by tutaj posłużenie się nazwą cluster pool które pomiędzy środowiskami nie ulegnie zmianie.
Podsumowanie:
Za sobą mamy pierwsze modyfikacje kodu Databricks Labs, jednak nie jedyne. Dzięki tym modyfikacjom możemy zmigrować klastry i użytkowników wykonując poniższą komendę.
python migrate_db.py --export_profile <profil źródła> --import_profile <profil celu> --all --users --clusters --bypass-windows-check --azure
Dalsze modyfikacje niedługo.
Comments are closed.