티스토리 뷰

LinkedIn 개발 사례

LinkedIn에서는 큰 규모의 서비스를 운영함과 동시에 데이터 수집 시스템을 운영하는 데에 어려움이 발생했다.

다양한 Source/Target 어플리케이션에 Data Pipeline가 구축되어있다보니, 데이터 파편화 현상이 발생했다.

이에 따라 Kafka를 개발하여 시스템 간 Data Pipeline으로 배치하였다.

결과적으로 기존의 복잡한 Data Pipeline이 중앙집중화(일원화) 되었으며 시스템 간 결합도가 감소되었다.

 

패러다임의 변화

이전에는 일반적으로 서비스를 운영하며 비즈니스에 관련된 데이터만을 수집하여 서비스에 활용했었다.

그러나 최근에는 기본적으로 사용자에 관련된 모든 데이터를 수집하고, 이러한 데이터(빅데이터)를 기반으로 인사이트를 도출한다.

이와 같은 패러다임 변화에 의해 서비스 운영의 범위 내에 Data Pipeline을 구축하여 Data Lake를 구성하는 것이 일반적이게 되었다. (LinkedIn 사례)

 

Data Pipeline 구성에 Kafka가 가지는 이점?

그렇다면 Data Pipeline을 구성하는데 Kafka를 사용하면 가질 수 있는 이점은 어떤 것들이 있을까?

  1. High Throughput
    1. Kafka는 데이터를 묶어서 전송하기 때문에 네트워크 비용을 절약하여 높은 처리량을 가진다.
    2. 파티션과 컨슈머의 갯수를 조정하여 병렬처리하여 높은 처리량을 가진다.
  2. 확장성
    1. 클러스터 구성하여 브로커 갯수를 Scale-in/out 하여 유연성을 확보할 수 있다.
  3. 영속성
    1. File System에 데이터를 저장함으로써 영속성을 보장한다.
    2. 받은 데이터를 메인 메모리의 페이지 캐시에 저장함으로써 File I/O 지연을 줄인다.
  4. High Availability
    1. 클러스터 구성하여 Data를 Replication하여 내결함성과 높은 가용성을 가진다.

 

EC2에서 Kafka 실행하기

JDK 설치

우선 EC2 인스턴스를 하나 받아서 JDK를 설치한다. 카프카 브로커는 JVM 환경 위에서 실행되기 때문이다.

sudo yum install -y java-1.8.0-openjdk-devel.x86_64

java -version

위 명령어를 실행하여 JDK를 설치하고 java 버전을 확인한다.

설치된 java 버전 확인

카프카 바이너리 패키지 설치

JDK를 정상적으로 설치했다면 카프카 바이너리 패키지를 설치한다.

카프카 바이너리 패키지는 Kafka 실행에 대한 여러 파일이 들어있다.

https://kafka.apache.org/downloads 사이트에 접속하여 Scala 2.12 버전에 맞는 패키지 다운로드 링크를 복사하여 설치한다.

wget https://downloads.apache.org/kafka/3.7.0/kafka_2.12-3.7.0.tgz

tar xvf kafka_2.12-3.7.0.tgz

위 명령어로 설치한 디렉토리의 bin 디렉토리에 접근하면 여러 실행 쉘을 확인할 수 있다.

Kafka 관련 실행 파일

힙 메모리 설정

그리고 나서 카프카 브로커, 주키퍼 힙 메모리를 설정한다.

카프카 브로커는 처리하는 레코드를 시스템 메모리의 페이지 캐시에 저장하고, 그 외의 객체들은 힙 영역에 저장한다.

그러나 기본적으로 카프카 브로커의 힙 사이즈는 1G이고, 주키퍼의 기본 힙 사이즈는 512M이다.

kafak-server-start.sh 파일
zookeeper-server-start.sh 파일

위의 카프카 서버 시작 쉘의 중간 지점에 보면 export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G" 라는 설정을, 주키퍼 서버 시작 쉘의 중간 지점에 export KAFKA_HEAP_OPTS="-Xmx512M -Xms512M" 설정을 확인할 수 있다.

현재 EC2 인스턴스의 메모리 크기는 1G이므로 주키퍼와 카프카 브로커를 동시에 실행할 수 없는 환경이다. 따라서 힙 사이즈 환경변수를 설정할 필요가 있다.

EC2 인스턴스 메모리 사이즈
KAFKA_HEAP_OPTS 환경변수 설정 변경

위와 같이 환경변수를 설정하더라도 SSH 세션이 종료되고 나면 환경변수를 초기화되어버린다. 따라서 아래와 같이 초기화되지 않도록 환경 변수를 추가로 설정해준다.

 

vi ~/.bashrc 명령어로 bash 쉘 실행시 구동되는 파일 내에 환경변수 설정 추가

 

카프카 브로커 실행 옵션 설정

config 폴더 내 server.properties 파일에는 카프카 브로커에 관련된 옵션들이 설정되어 있다.

server.properties 1
server.properties 2
server.properties 3

해당 파일에서 설정할 수 있는 옵션에 대해서 하나씩 살펴보자.

  • broker.id: 실행하는 카프카 브로커의 번호 (브로커를 구분하기 위함, 고유해야함)
  • listeners: 카프카 브로커가 통신을 위해 열어둘 인터페이스 IP, port, protocol (미설정시 접속 제한 없음)
  • advertised.listeners: 카프카 클라이언트, 카프카 CLI에서 접속할 때 사용하는 IP, port 정보
  • listener.security.protocol.map: SALS_SSL, SASL_PLAIN 보안 설정 시 protocol 매핑을 위한 설정
  • num.network.threads: 네트워크를 통한 처리 시 사용할 스레드 개수
  • num.io.threads: 카프카 브로커 내부에서 사용할 스레드 개수
  • log.dirs: 가져온 데이터를 파일로 저장할 디렉토리 위치
  • num.partitions: 디폴트 파티션 개수
  • log.retention.hours: 저장한 파일이 삭제되기까지 걸리는 시간 (log.retention.ms 사용가능, -1 설정 시 삭제 안함)
  • log.segment.bytes: 저장할 파일의 최대 크기 (넘는 경우 새로운 파일 생성)
  • log.retention.check.interval.ms: 저장한 파일을 삭제하기위해 체크하는 시간 주기
  • zookeeper.connect: 카프카 브로커와 연동할 주키퍼의 IP, port
  • zookeeper.connection.timeout: 주키퍼의 세션 타임아웃 시간

이렇게 설정한 옵션 중 당장 실행하기 위해서는 수정해야할 옵션이 두 가지 있다.

advertised.listeners 옵션을 현재 접속하고 있는 EC2 인스턴스의 IP주소의 9092 포트로 설정한다.

그리고 zookeeper.connect 옵션을 localhost의 2181 포트로 설정한다.

 

주키퍼 실행하기

주키퍼 실행하고 상태 확인

./zookeeper-server-start.sh -daemon ../config/zookeeper.properties

jps -vm

위 명령어를 입력하여 주키퍼를 실행하였다. (-daemon 옵션을 붙여 백그라운드에서 실행)

jps는 JVM의 상태를 확인하는 도구로써 JVM 위에서 동작하는 주키퍼의 상태를 확인할 수 있다.

 

카프카 브로커 실행하기

카프카 브로커 실행하고 상태 및 로그 확인

./kafka-server-start.sh -daemon ../config/server.properties

jps -m

tail -f ../logs/server.log

위 명령어를 실행하여 카프카 브로커를 실행하고, 정상적으로 동작하고 있는지 확인했다.

그리고 로그를 모니터링 할 수 있도록 tail 명령어를 수행했다.

 

이제 로컬 PC에서 새로운 터미널을 띄워 EC2 인스턴스의 9092 포트로 요청을 보내서 통신이 잘되는지 확인해보자.

로컬 환경에 카프카 바이너리 설치
카프카 바이너리 파일 압축 풀기
EC2 인스턴스의 9092 포트로 카프카 관련 정보 조회

 

Kafka Command-Line Tool

카프카를 운영하는데에 있어 데이터를 주고받는 것도 중요하지만, 카프카의 토픽이나 파티션 개수 등 메타 데이터에 대한 명령을 실행하는 것 역시 중요하다.

이러한 명령을 실행하는 방법은 카프카 브로커가 설치된 인스턴스에 SSH로 접속하여 실행하는 방법과, 카프카 브로커에 접근가능한 컴퓨터에서 명령을 실행할 수도 있다.

그렇다면 로컬 컴퓨터에 카프카 커맨드 라인 툴 명령을 EC2 인스턴스의 카프카 브로커로 요청해보자.

 

kafka-topics.sh

이 커맨드 라인 툴을 활용하여 토픽과 관련된 명령을 실행할 수 있다. (토픽이란 데이터를 주고받을 수 있는 공간이라고 보고 일단 넘어가자)

--create 명령 실행

./kafka-topics.sh --create --bootstrap-server kafka-demo-instance:9092 --topic first.topic

로컬 환경에서 위 명령어를 실행해서 토픽을 성공적으로 생성했다.

--create 옵션은 토픽 생성 명령어라는 의미이다.

--bootstrap-server 옵션은 클러스터를 구성하는 브로커들의 주소를 적는다. (위 경우는 hosts 파일에 ip alias를 미리 정의해두었다)

--topic 옵션은 토픽 이름을 작성한다. 토픽 이름은 내부 데이터를 유추할 수 있을 정도로 명확하게 작성한다. (유지보수성)

 

--create 명령 실행 2

--partitions 옵션은 파티션 개수를 지정하는 옵션이다. 최소 개수는 1개이며 이 옵션을 사용하지 않으면 설정파일 기준으로 생성한다.

--replication-factor 옵션은 파티션을 복제할 복제 개수를 설정한다. 1은 복제하지 않는다는 의미이며 최대 개수는 브로커의 개수이다.

--config 옵션을 통해 kafka-topics.sh에 포함되지 않는 설정을 추가할 수 있다. retention.ms는 토픽 데이터를 유지하는 기간이다.

 

--list 명령 실행

카프카 클러스터에 생성된 토픽을 --list 옵션을 통해 확인할 수 있다. 

 

--describe 명령 실행

생성된 토픽의 상세 상태를 --describe 옵션을 사용하여 확인할 수 있다. 

토픽의 파티션, 복제, 기타 설정 등에 대한 상세한 정보를 확인할 수 있다.

하나의 토픽을 구성하는 여러 파티션이 카프카 클러스터의 특정 브로커에 집중되어 있다면 이를 분산시켜 네트워크 병목현상을 해소시킬 필요가 있다. 이 경우 해당 명령어를 활용하기에 좋다.

 

first.topic 파티션 4개로 변경

--alter 옵션을 사용하면 토픽의 설정을 쉽게 수정할 수 있다.

위 예제를 보면 first.topic의 파티션을 4개로 변경하고 성공적으로 수정 내용을 확인할 수 있다.

 

kafka-configs.sh로 토픽 설정 변경

kafka-configs.sh을 활용해 다이나믹 토픽 설정을 수정할 수 있다. 이러한 설정들은 kafka-topics.sh로는 수정할 수 없다.

(이러한 설정들이 점점 분리되는 추세이다)

 

kafka-console-producer.sh

그렇다면 이제 생성된 토픽에 데이터를 제공하는 kafka-console-producer.sh 명령어를 실행해보자.

이러한 데이터를 레코드라고하며 key=value 구조를 가진다.

메세지 전송

그렇다면 위와 같이 메세지를 전송하면 어떻게 될까?

key가 null인 메세지가 브로커로 전송된다.

여기에서 주의할 점은 kafka-console-producer.sh로 전송되는 값은 UTF-8 기반의 Byte로 변환되어 ByteArraySerializer로 직렬화된다. 따라서 String 타입으로만 데이터를 전송할 수 있다.

 

key=value 형식의 메세지 전송

위와 같이 parse.key 속성을 추가하면 메시지 키를 추가해서 전송할 수 있다.

그리고 key.separator 속성으로 key, value의 구분자를 설정할 수 있다.

 

기본적으로 key를 null로 전달하게 되면 청크 단위만큼 라운드로빈 방식으로 파티션에 할당된다.

그렇지만 key를 따로 설정하는 경우에는 key의 해시값을 기준으로 해당 키의 메세지가 이미 존재하는 파티션에 할당된다.

 

kafka-console-consumer.sh

producer가 전송한 메세지를 kafka-console-consumer.sh 명령어로 확인할 수 있다.

메세지 수신

--from-beginning 옵션을 추가로 설정해서 토픽에 저장된 가장 처음 데이터부터 출력했다.

그러나 출력 순서는 입력 순서와 다르게 구성되는데, 그 이유는 해당 토픽이 여러 개의 파티션을 가지기 때문이다.

 

key-value 구조로 메세지 수신

--group 옵션을 설정해서 신규 consumer group을 생성했다.

consumer group은 1개 이상의 컨슈머로 구성되며, 이 컨슈머 그룹에 가져간 토픽의 메시지는 커밋한다.

커밋이란 컨슈머가 특정 레코드까지 처리완료했다고 카프카 브로커에 저장하는 것이다. (__consumer_offsets 이름의 내부 토픽에 저장)

 

kafka-consumer-groups.sh

컨슈머 그룹 목록 조회

위와 같이 kafka-consumer-groups.sh 명령어에 --list 옵션을 입력하면 컨슈머 그룹 목록을 조회할 수 있다.

 

컨슈머 그룹 상세정보 조회

위와 같이 명령어를 실행하여 first-group의 상세정보를 조회했다.

여기서 여러가지 정보를 조회할 수 있는데 처음 들어보는 항목은 아래의 내용을 참고하자.

  • OFFSET: 파티션의 각 레코드에 할당된 번호, 데이터가 입력될 때마다 1씩 증가된 값을 할당받음
  • CURRENT-OFFSET: 컨슈머 그룹이 가져간 토픽의 파티션에 가장 최신 OFFSET이 몇 번인지 표현
  • LOG-END-OFFSET: 컨슈머가 커밋한 마지막 OFFSET (CURRENT-OFFSET 보다 크거나 같다)
  • LAG: 파티션에 있는 데이터를 가져가는데 발생하는 지연에 대한 지표 (pub 속도 > sub 속도인 경우 증가)
  • CONSUMER-ID: 컨슈머의 파티션 할당을 카프카 내부적으로 구분하기 위해 사용하는 ID (clientId + uuid)
  • HOST: 컨슈머가 동작하는 host명
  • CLIENT-ID: 컨슈머에 할당된 ID (따로 설정할 수 있으며, 미설정의 경우 자동 생성)

'공부 > Apache Kafka' 카테고리의 다른 글

Producer와 Consumer  (0) 2024.05.05
Kafka 기본 개념  (0) 2024.04.17
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
TAG
more
«   2025/04   »
1 2 3 4 5
6 7 8 9 10 11 12
13 14 15 16 17 18 19
20 21 22 23 24 25 26
27 28 29 30
글 보관함