전체 글
-
[Airflow] Composer에서 worker 숫자가 늘지 않았던 일개발 끄적 2022. 4. 12. 21:41
문제 상황 회사에서 GCP에서 프로젝트를 하고 있다. Airflow가 필요하니 Cloud Composer를 사용하고 있는데 당연히 병렬 처리가 필요하다. Composer 2는 Autoscaling을 지원하기에 Worker 수를 최소 1개, 최대 10개로 지정해놨다. 그런데 문제는 모니터링해보니 아무리 작업을 추가해도 Worker 수가 1개에서 늘지가 않는다! 심지어 메모리가 부족해서 fail이 발생하는데도 Worker 수는 늘리지 않는다. 삽질 이런 저런 추론을 해봤다. 1. Celery Executor 설정이 안된건 아닐까? 내가 알기론 Sequential Executor가 Airflow default값이고, 병렬 실행을 지원하지 않는것으로 알고있다. 확인해보니 Composer에서는 기본으로 Celer..
-
[pandas] ==와 is는 같지 않으니 주의하자개발 끄적 2022. 4. 12. 00:44
데이터를 다루는 사람이라면 Jupyter Notebook에서 실험을 많이 한다. 그리고 제대로 돌리기 위해서 python 코드로 옮기는 작업을 하곤 한다. ipynb는 스크립트니 그대로 옮기진 않게 되는데 이런 과정에서 많은 에러를 유발하곤 한다. notebook과는 다르게 데이터 프레임이 넘어가는게 중간중간 보이지 않다보니 디버깅이 쉽진 않다. 문제 상황 분명 똑같이 옮겼다고 생각했는데 에러가 발생했다. 에러가 발생한 구문은 아래와 같다. df = df[df['age'].isna() is False] 내가 가진 데이터에서 age가 null이 아닌 row만 남기고 싶었다. 나이스하진 않은 방식같긴 하다만 일단 그건 논외고.. 에러는 KeyError이였다. (raise KeyError(key) from e..
-
[Airflow] sub dagLearn/Airflow 2022. 4. 7. 23:09
sub dag dag를 만들어 쓰다보니 잘쓰고 싶은 욕심이 들게된다. 그러다보니 점점 task를 잘게 쪼개고 병렬화도 많이 하게 된다. 나는 Graph형태로 자주 보는편인데 문제는 너무 복잡해진다. 이 때 sub dag를 쓰면 좋다. 메인 dag에서는 sub dag로 묶어서 보이니 간결해진다. 써보자 Sub Dag를 만드는건 기존의 Dag를 만드는 것과 다르지 않다. Dag를 만들고 메인 Dag에서 SubDagOperator를 쓰면 Sub Dag가 된다. 추후에 빨리 참고하기위해 코드를 남긴다만, 사용법은 너무도 다양하다. 아래처럼 작성하면 prepare_train이 끝나면 5개의 train이 병렬로 실행된다. (지금보면 별것 아니지만 Airflow 사용이 미숙해서 병렬로 실행하는 것도 한참 삽질했다....
-
[BigQuery] EXECUTE IMMEDIATE가 이상해요개발 끄적 2022. 4. 6. 21:38
문제 상황 동료분이 Hue에서 사용하던 쿼리를 python api로 날리던 중 참으로 이상한 일이 있었다. 바로 EXECUTE IMMEDIATE 문법을 사용한 구문들이 400 에러를 발생시키는 것이었다. (Access denied) 정확히는 Hue에서는 잘 동작하고 Superset, python api에서는 잘 동작하지 않았다. EXECUTE IMMEDIATE? 이 문법을 안써본 사람도 아래 예시를 보면 이해가 바로 될 것이다. 쉽게 설명하자면 String형태로 정의된 쿼리를 넘겨주면 실행시켜주는 것이다. 자세한 용법은 구글 문서가 잘 정리되어있으니 고고 이게 외않되? 아래 세 개의 예시를 보자. 1. 단순한 SELECT문을 EXECUTE IMMEDIATE에 넘겼을 뿐이다. 동작하지 않는다. (Acces..
-
[BigQuery] Backtick 사용에 대한 고찰개발 끄적 2022. 4. 5. 23:40
Backtick 개요 BigQuery의 테이블 주소는 아래와 같이 세 구역으로 구성되어있다. my-project.my_dataset.my_table 프로젝트 (점) 데이터셋 (점) 테이블 위 처럼 프로젝트 이름에는 대시(-)를 쓸 수 있고 데이터셋과 테이블엔 대시는 안되고 언더바(_)만 허용된다. 문제 상황 그간 BigQuery를 쓰면서 사실 누가 딱히 알려준건 아닌데 어느순간부터 백틱을 써왔다. 누가 가르쳐준 것도 아니고.. 나도 왜인지 기억이 안난다만 습관적으로 아래와 같이 써오고 있었다. `my-project.my_dataset`.my_table 최근에 몇 가지 문제가 생겨서 백틱에 대해서 제대로 알아보게 되었다. 일단 만나게 된 문제는 1. 다른 사람들의 쿼리를 보다보니 백틱 사용이 다들 제각각이..
-
5. Airflow variablesLearn/Airflow 2021. 11. 19. 21:07
조만간 업무상 bigquery에서 airflow를 쓰게될 것 같다. 그땐 아래 링크를 보고.. 일단 variable로 jump https://youtu.be/wAyu5BN3VpY Airflow에서는 key-value 쌍으로 된 variable들을 설정할 수 있다. 물론 아래와 같이 직접 DAG 코드에 작성할 수 있지만 이렇게 사용하지 말기를 권장한다. # Do not use variables like this var1 = "value1" var2 = [1, 2, 3] var3 = {'k': 'value3'} 그럼 어떻게 쓰는게 좋은건가? Web UI에서 Variable에 key-value를 등록해서 사용하길 권장한다. 다만 주의할 점은, 이 Variable들은 Airflow Metadata DB에 저장되..
-
4. Airflow conceptLearn/Airflow 2021. 11. 14. 18:04
DAG (Directed Acyclic Graph) 돌릴 task를 모아놓은걸 DAG라고 부른다. 노드간에 진행 방향이 있는 그래프를 directed graph라고 부르고 순환형이 아닌 것을 Acyclic graph라고 부른다. operator task가 어떤 행동을 할 것인지를 operator라고 부른다. operator들은 이미 많은 것들이 구현되어있고 계속해서 개발되고 있다. operator에는 세 가지 유형이 있다. ① Sensors 특정 조건이 만족될 때까지 계속 돌고 있는 operator 조건의 예시로 특정 시간을 기다린다던가 무언가 파일이 올때까지 기다리는 것이 있다. ② Operators 특정 행위를 실행시키는 operator BashOperator, PythonOperator 이런 것들이..
-
3. pipeline을 만들어보자 (+execution_date)Learn/Airflow 2021. 11. 11. 23:26
설치까지 했으면 이제 파이프라인을 만들어볼 차례다. 흘려보낼 데이터가 없어서 실습이 고민되지만 일단 한 단계씩 따라가보자. 파이프라인을 만드는 작업은 5단계로 나눠볼 수 있다. Step 1. Importing modules 필요한 라이브러리를 import하자. from datetime import timedelta import airflow from airflow import DAG from airflow.operators.bash_operator import BashOperator Step 2. Default Arguments dictionary형태로 기본 arg를 작성하자. 여기서 주의해야할건 시간이다. airflow의 시간 개념을 알고가야한다. 아래에서 start_date는 실행되는 날짜가 아니다...