안녕하세요 이정운 입니다.
이번에는 얼마전 대용량 BigQuery 성능을 테스트 하기 위하여 대량 데이터 로드를 쉽게 지원해주는 Embulk 라는 오픈 소스를 가지고 Oracle DB 에 있는 데이터를 배치성으로 BigQuery 로 밀어 넣는 과정을 수행한 적이 있는데 이에 대해서 짧게 정리하고 공유해 보도록 하겠습니다.
혹시 아직 BigQuery 를 모르는 분들을 위해서 간단히 이야기하면 BigQuery 는 Google의 페타바이트급 규모의 분석용 데이터 웨어하우스로 완전 관리형이라 인프라가 없으며(관리할 필요가 없다는 의미) 데이터 분석가들에 익숙한 SQL로 데이터를 분석해 의미 있는 정보를 찾는 데 집중할 수 있도록 특화된 아주 경제적인 Cloud 용 데이터 웨어하우스 입니다. ( https://cloud.google.com/bigquery/?hl=ko )
대용량 데이터를 가지고 처음으로 BigQuery 로 밀어넣다 보니 이런 저런 시행착오도 좀 거쳐봤고, ETL 도구로 오픈소스인 Embulk 도 처음 써보는 것인지라 생각했던 것보다는 더 오래 걸린 작업이지 않았나 합니다. 그래도 해당 테스트는 약 10억건이 넘는 데이터를 Oracle DB 에서 읽어서 다시 BigQuery 로 벌크 로딩하는 작업을 수행하였기 때문에 충분한 사이즈의 데이터를 가지고 테스트 해본거라 나름 의미가 있었고 그 와중에 제가 겪었던 다양한 이슈를 공유하게 되면 다른 분들은 저와 같은 시행착오를 거치지 않지 않을까 하는 바람에 별거 없지만 제가 작업한 내용을 공유드립니다.
역시나 먼저 해보신 분들이 공유해주신 글에서 많은 도움을 얻었습니다.
Embulk로 Bigquery에 데이터 로딩하기
https://jungwoon.github.io/bigdata/2017/09/01/Embulk_BigQuery/
Apache Airflow를 이용한 데이터 워크플로우 자동화
http://whitechoi.tistory.com/50
빅데이타 수집을 위한 데이타 수집 솔루션 Embulk 소개
http://bcho.tistory.com/1126
#1) Embulk 설치
우선 Embulk 라는 오픈 소스는 ‘http://www.embulk.org’ 공식 사이트에도 언급되어 있지만 다양한 데이터베이스, 저장소, 파일 형식 및 클라우드 서비스간에 데이터를 전송하는 데 도움이되는 오픈 소스 대량 데이터 로더입니다. 하단의 그림을 보면 바로 이해하실 수 있을 것인데 한쪽 서비스에서 다른 쪽 서비스로 data 를 벌크로 로딩하는 작업을 plugin 을 통해서 아주 쉽게 할 수 있도록 지원할 수 있습니다.
다만 Embulk 는 일반적인 ETL 과는 다르게 Transformation 은 기능은 조금 약하지만 Extraction, Loading 은 다양한 plugin 을 제공하여 꽤 쓸만합니다.
당연히 Embulk 를 사용하려면 먼저 설치해야 합니다. 그 부분은 이미 위에서 언급한 다른 좋은 강의에서 잘 설명해주셨으니 가능하시면 해당 링크를 참고하시기 바라며 이부분은 간단히 정리만 해보도록 하겠습니다.
JVM 및 Embulk 설치(보시면 아시겠지만 굉장히 간단하게 설치가 가능합니다.)
% sudo apt-get update
% sudo apt-get install default-jre
% curl — create-dirs -o ~/.embulk/bin/embulk -L “http://dl.embulk.org/embulk-latest.jar"
% chmod +x ~/.embulk/bin/embulk
% echo ‘export PATH=”$HOME/.embulk/bin:$PATH”’ >> ~/.bashrc
% source ~/.bashrc
다음으로 목표가 Oracle DB 에서 데이터를 읽어와서 BigQuery 에 결과를 쓸 예정이므로 하단과 같은 plugin 들을 설치합니다. Plugin 이름만 봐도 직관적으로 알 수 있겠지만 Oracle DB 를 input 으로 하는 plugin 과 BigQuery 를 output 으로 하는 plugin 입니다. (당연히 상황에 따라 다른 다양한 plugin 도 설치하여 활용 가능합니다. 보시는 것처럼 Plugin 이름들이 직관적이라 찾기에 용이합니다)
% embulk gem install embulk-input-oracle
% embulk gem install embulk-output-bigquery
여기까지 수행을 했으면 데이터를 퍼다 나르는 Embulk 쪽 준비는 어느정도 된거고 이제 실제 작업을 수행할 때 필요한 인증 작업에 대한 준비를 진행해야 합니다. Oracle DB 는 JDBC Driver 를 통해서 연결하므로 JDBC Driver 를 연결할 기본 정보만 가지고 있으면 됩니다.
BigQuery 를 연결할 때는 권한이 부여된 서비스 계정이 하나 필요하므로 Google cloud 콘솔의 IAM 메뉴에서 서비스 계정을 하나 만들고 BigQuery 에 접근하기 위한 권한을 부여한 후 해당 key 를 p12 형태로 다운로드 받습니다. 이렇게 받은 key 는 Embulk 를 수행할 GCE 에 업데이트(복사) 해둡니다. 결국은 Embulk 가 배치를 수행하여 BigQuery 로 데이터를 밀어 넣을때 이렇게 만든 서비스 계정과 key 를 사용해서 인증하게 되고 접근 가능해집니다.
#2) Embulk 작업 파일 생성 및 수행
다음으로 Embulk 의 실제 작업 수행 절차를 정의하는 config.yml 형태의 파일을 만듭니다. 이 설정 또한 굉장히 직관적으로 수행 환경에 대한 설정을 위한 exec, input 을 정의하는 in, output 을 정의하는 out 으로 되어 있습니다. 그리고 각 세부 설정은 각 plugin 의 정책을 따릅니다. (각 정책은 하단의 링크에 있는 github 를 참고하시면 됩니다.)
Oracle input plugin for Embulk
https://github.com/embulk/embulk-input-jdbc/tree/master/embulk-input-oracle
embulk-output-bigquery
https://github.com/embulk/embulk-output-bigquery
— — ora_bq_test.yml — —
exec:
max_threads: 20
min_output_task: 10
in:
type: oracle
driver_path: /home/embulkT/ojdbc7.jar
host: 10.140.0.2
user: TEST_DW
password: “TEST_DW”
database: orcl
query: |
Select * from TEST_DB
out:
type: bigquery
mode: replace
auth_method: private_key #default
service_account_email: [email protected]
p12_keyfile: /home/embulkT/jwlee-myproject-01-f12e5fb9e3d7.p12
project: jwlee-myproject-01
dataset: a_test
table: TEST_DB
auto_create_table: true
auto_create_dataset: true
ignore_unknown_values: true
allow_quoted_newlines: true
path_prefix: /data1/embulk
open_timeout_sec : 7200
send_timeout_sec : 7200
read_timeout_sec : 7200
여기서 참고하실 부분은 작업 설정 파일의 SQL 문장인데 명시된 그대로 Oracle DB 를 읽어오므로 날자별로 자르거나 분리해서 작업하는 경우라도 설정의 SQL 만 변경해주면 됩니다.
작업 설정 파일을 작성한 후 embulk preview ora_bq_test.yml 명령을 통해서 설정상의 이슈가 없는지 점검한 후에 이제 실제 데이터를 옮기는 작업을 수행합니다.
가급적이면 cmd 창이 닫혀도 배치가 중단되지 않도록 nohup 으로 해당 명령을 수행합니다. 또한, nohup 으로 한 경우에는 해당 명령을 수행한 디렉토리에 nohup.log 로 로그도 남게 되므로 문제 발생시에 차근 차근 분석도 가능하여 일석 이조의 효과를 얻을 수 있습니다.
nohup embulk run ora_bq_test.yml — log-level debug &
이렇게 하면 정상적으로 Oracle DB 의 데이터를 Embulk 를 활용하여 아주 쉽게 BigQuery 로 이동시킬 수 있습니다. 당연히 컴퓨터 사양에따라 성능차이가 있겠지만 몇 시간 정도면 10억건이 넘는 데이터에 대한 마이그레이션이 가능합니다. (가장 중요한 것은 배치라 돌려놓고 자리를 비우거나 퇴근이 가능하다는 ㅎㅎ) 정리는 이렇게 간단히 하지만 사실 하면서 이미 언급한 것처럼 다양한 시행착오를 많이 겪었습니다. 이번 이야기의 중점적인 사항은 이런 시행착오를 제 글을 읽는 분들은 똑같이 겪지 않았으면 하는 바램으로 해당 부분에 대해서 정리한것을 추가 공유하도록 하겠습니다.
#3) Embulk 작업 중에 발생한 Error
① org.embulk.exec.PartialExecutionException
org.embulk.exec.PartialExecutionException: org.jruby.exceptions.RaiseException: (SSLError) certificate verify failed
at org.embulk.exec.BulkLoader$LoaderState.buildPartialExecuteException(BulkLoader.java:340)
at org.embulk.exec.BulkLoader.doRun(BulkLoader.java:548)
at org.embulk.exec.BulkLoader.access$000(BulkLoader.java:35)
at org.embulk.exec.BulkLoader$1.run(BulkLoader.java:353)
at org.embulk.exec.BulkLoader$1.run(BulkLoader.java:350)
at org.embulk.spi.Exec.doWith(Exec.java:22)
at org.embulk.exec.BulkLoader.run(BulkLoader.java:350)
at org.embulk.EmbulkEmbed.run(EmbulkEmbed.java:162)
at org.embulk.EmbulkRunner.runInternal(EmbulkRunner.java:292)
at org.embulk.EmbulkRunner.run(EmbulkRunner.java:156)
해결 방법 : 하단의 링크를 통해서 삭제된 equifax root ca 를 다시 만들어서 os 차원으로 업데이트 합니다.
http://www.unknownengineer.net/entry/2017/12/15/100000
② org.embulk.exec.PooledBufferAllocator$BufferDoubleReleasedException
2018–03–04 03:43:06.575 +0000 [INFO] (0040:task-0000): Fetched 4,096,000 rows.
2018–03–04 03:44:31.572 +0000 [INFO] (0040:task-0000): Fetched 8,192,000 rows.
org.embulk.exec.PooledBufferAllocator$BufferDoubleReleasedException: Detected double release() call of a buffer
at org.embulk.exec.PooledBufferAllocator$NettyByteBufBuffer.release(PooledBufferAllocator.java:46)
at org.embulk.spi.Page.release(Page.java:68)
at org.embulk.exec.LocalExecutorPlugin$ScatterTransactionalPageOutput$OutputWorker.call(LocalExecutorPlugin.java:357)
at org.embulk.exec.LocalExecutorPlugin$ScatterTransactionalPageOutput$OutputWorker.call(LocalExecutorPlugin.java:288)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.embulk.exec.PooledBufferAllocator$BufferReleasedBeforeAt
at org.embulk.exec.PooledBufferAllocator$NettyByteBufBuffer.release(PooledBufferAllocator.java:51)
at org.embulk.spi.PageReader.close(PageReader.java:152)
at sun.reflect.GeneratedMethodAccessor35.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
해결 방법 : embulk 수행중에 하단과 같이 /tmp 폴더에 버퍼형태의 csv 파일이 생성되고 공간이 없으면 상단의 error 가 발생됨. 따라서 /tmp 삭제해서 충분한 공간을 확보하거나 config.yml 에서 path_prefix: /data1/embulk 를 추가하여 공간이 있는 곳으로 버퍼 파일 생성 위치를 변경하면 됨
③ org.jruby.exceptions.RaiseException: (TransmissionError) execution expired
2018–03–04 07:52:30.916 +0000 [INFO] (0001:transaction): embulk-output-bigquery: delete /data1/embulk_.20516.2030.csv
org.embulk.exec.PartialExecutionException: org.jruby.exceptions.RaiseException: (TransmissionError) execution expired
at org.embulk.exec.BulkLoader$LoaderState.buildPartialExecuteException(BulkLoader.java:340)
at org.embulk.exec.BulkLoader.doRun(BulkLoader.java:548)
at org.embulk.exec.BulkLoader.access$000(BulkLoader.java:35)
at org.embulk.exec.BulkLoader$1.run(BulkLoader.java:353)
at org.embulk.exec.BulkLoader$1.run(BulkLoader.java:350)
at org.embulk.spi.Exec.doWith(Exec.java:22)
at org.embulk.exec.BulkLoader.run(BulkLoader.java:350)
at org.embulk.EmbulkEmbed.run(EmbulkEmbed.java:162)
at org.embulk.EmbulkRunner.runInternal(EmbulkRunner.java:292)
at org.embulk.EmbulkRunner.run(EmbulkRunner.java:156)
at org.embulk.cli.EmbulkRun.runSubcommand(EmbulkRun.java:437)
at org.embulk.cli.EmbulkRun.run(EmbulkRun.java:92)
at org.embulk.cli.Main.main(Main.java:26)
Caused by: org.jruby.exceptions.RaiseException: (TransmissionError) execution expired
at RUBY.error(/home/jerryjg/.embulk/lib/gems/gems/google-api-client-0.19.8/lib/google/apis/core/http_command.rb:272)
at RUBY.execute_once(/home/jerryjg/.embulk/lib/gems/gems/google-api-client-0.19.8/lib/google/apis/core/upload.rb:264)
at RUBY.block in execute(/home/jerryjg/.embulk/lib/gems/gems/google-api-client-0.19.8/lib/google/apis/core/http_command.rb:104)
해결 방법 : 대량 데이터에 대한 작업이 필요한 경우 하단의 링크 참고하여 timeout 설정을 늘려야 함
https://qiita.com/wroc/items/7d35fdbd7b299482cbbc
open_timeout_sec : 7200
send_timeout_sec : 7200
read_timeout_sec : 7200
참고로 저처럼 시행착오가 많아서 테스트를 많이하는 경우에는 BigQuery 의 update table quota 에 걸릴수가 있습니다. 또는 더 많은 데이터를 좀 더 안정적이고 순자척으로 이동하고 싶다면 Oracle DB 에서 바로 BigQuery 에 넣는 것보다 중간에 GCS 를 두고 먼저 Oracle DB 에서 csv 파일 형태로 GCS 로 옮기고 그 뒤 다시 BigQuery 로 로딩 하는 것도 좋은 방안입니다.(참고로 Embulk 는 이미 이것도 준비하고 있다는…)
gcs_bucket: bucket_name
auto_create_gcs_bucket: false