프로듀서는 카프카에서 메시지를 생산해서 토픽으로 메시지를 보내는 역할을 한다. 토픽으로 메시지를 보낼 때는 메시지를 토픽 파티션에 매핑하고 파티션의 리더에 요청을 보낸다. 이때 키 값을 정해서 키를 가진 모든 메시지를 동일한 파티션으로 보낼 수 있다. 정하지 않는다면 라운드 로빈 방식으로 파티션에 분배된다.
1. 프로듀서가 메시지를 보내는 세 가지 방법
1.1 메시지를 보내고 확인하지 않기
프로듀서가 서버로 메시지를 보내고 성공적으로 도착했는지를 확인하지 않는다. 카프카는 항상 살아있고 프로듀서가 자동으로 재전송하기 때문에 대부분의 경우 성공적으로 전송되지만, 메시지 손실 가능성이 존재한다.
1.2 동기 전송
프로듀서는 메시지를 보내고 send() 메서드의 Future 객체를 반환한다. get() 메서드를 사용해 Future를 기다린 후 send()가 성공했는지 실패했는지를 확인한다. 이 때문에 보다 신뢰성 있는 메시지를 전송할 수 있다.
1
2
// spring kafka의 producer interface에 있는 동기 전송 메서드
Future<RecordMetadata> send(ProducerRecord<K, V> var1);
1.3 비동기 전송
프로듀서는 send()를 콜백과 같이 호출하고, 브로커에서 응답을 받으면 콜백 한다. 프로듀서가 보낸 모든 메시지에 대한 응답을 모두 기다리는 동기 처리 방식은 응답을 기다리는 시간을 소요한다. 그러나 비동기적으로 전송하면 응답을 기다리지 않아 더 빠른 전송이 가능하다. 또 한, 메시지를 보내지 못했을 때 예외 처리를 하게 해서 기록하고 분석에 활용할 수 있다.
1
2
// spring kafka의 producer interface에 있는 비동기 전송 메서드
Future<RecordMetadata> send(ProducerRecord<K, V> var1, Callback var2);
2. 프로듀서 주요 옵션
- bootstrap.servers: 카프카 클러스터는 마스터 개념이 없기에 클러스터에 처음 연결을 하기 위한 호스트와 포트 정보를 적어야 한다. name:port, name:port와 같은 리스트 형태로 적으면 된다. 하나의 호스트만 적어도 돼지만, 해당 호스트에 장애가 발생하면 접속이 안되는 현상이 발생한다. 따라서 호스트 전체를 입력하는 것이 좋다.
- acks: 프로듀서가 카프카 토픽 리더에게 메시지를 보는 후 요청을 완료하기 전 ack의 수이다. ack는 다음과 같은 값을 가질 수 있다. 기본 값은 1이다.
- acks=0:
- 프로듀서가 서버로부터 ack를 기다리지 않는다. 서버가 데이터를 받았는지 보장하지 않는다, 클라이언트는 전송 실패에 대한 결과를 알지 못하기에 재요청 설정을 할 수 없다. 대신 메시지 처리량이 매우 높다.
- acks=1:
- 리더가 데이터를 기록한 것을 확인한다. 다만, 팔로워가 기록했는지는 확인하지 않는다. 때문에 acks=0 보다 메시지 손실률이 낮긴 하지만 여전히 일부 데이터 손실이 발생할 수 있고 기다리는 시간이 추가되어 속도가 약간 떨어질 수 있다
- 이 옵션의 경우 리더가 자신의 브로커에 데이터를 저장하고 ack를 보낸 직후에 장애가 발생해서 팔로워들이 리더로부터 데이터를 가져올 수 없는 경우 메시지 손실이 발생한다. 다만 이는 극히 예외적인 경우이다. 최근 프로듀서 애플리케이션으로 많이 사용하는 log stash, filebeat 등에서도 acks를 1로 설정한다. 따라서 특별한 경우가 아니면 속도와 안전성을 확보할 수 있는 acks=1이 좋다.
- acks=all 또는 -1:
- 리더는 ISR의 팔로워로부터 데이터에 대한 ack를 기다린다. 때문에 가장 강력한 데이터 무손실을 보장하는 옵션이다.
- 프로듀서가 메시지를 전송하고 리더와 팔로워가 모두 메시지를 받았는지를 확인한다. 속도는 가장 느리지만 메시지 손실을 허용하지 않는다.
- acks를 통해 팔로워에 레플리케이션이 생성된 것을 보장하기 위해선 min.insync.replicas 옵션을 설정해야 한다. 이는 최소한 레플리케이션이 적용돼야 하는 팩터 수를 지정한다. 예컨대 팩터 개수가 3이고 min.insync.replicas 값이 2라면 3 개의 팩터 중 팩터 2개에 레플리케이션이 적용되면 ack를 응답한다. 따라서 min.insync.replicas 값이 1이면 acks가 all 이여도 acks=1과 같은 동작을 한다.
- 만약 설정한 min.insync.replicas 값보다 살아있는 브로커 수가 더 적다면, 옵션을 충족시키지 못해 에러가 발생한다. 이 때문에 장애에 대비해 팩터가 3일 경우 min.insync.replicas 값을 2로 설정하는 것을 권장한다.
- acks=0:
- buffer.memory: 프로듀서가 카프카 서버로 데이터를 보내기 위해 잠시 대기(배치 전송이나 딜레이 등) 할 수 있는 전체 메모리 바이트이다. 기본 값은 32MB이다.
- compression.type: 프로듀서가 데이터를 압축해서 보낼 때, 어떤 타입으로 압축할지를 결정한다. none, gzip, snapy, lz4 등의 포맷 중 하나를 선택해 사용하면 된다.
- retries: 일시적 오류로 인해 전소에 실패했을 때, 데이터를 다시 보낼 수 있는 횟수이다. 다만 모든 에러에 대해 재시도를 하진 않는다 RetriableException 이 발생한 경우에 대해서만 재시도를 한다.
- batch.size: 같은 파티션으로 보내는 여러 데이터를 한 번에 배치로 보내는 옵션이다. 이 옵션으로 배치를 바이트 단위로 조정할 수 있다. 배치를 보내기 전 클라이언트 장애가 발생하면 배치 내에 있는 메시지는 전달되지 않는다.
- linger.ms: 배치 형태의 메시지를 보내기 전에 메시지가 쌓이기까지 기다리는 시간을 조정한다. 만약 시간이 되기 전에 배치 사이즈까지 도달하면 바로 메시지를 보낸다. 배치 사이즈까지 도달하지 못했지만 시간이 도달해도 메시지를 보낸다. 기본값은 0이다.
- max.request.size: 프로듀서가 보낼 수 있는 최대 메시지 바이트 사이즈이다. 기본값은 메시지 1MB이다.
출처 - 카프카, 데이터플랫폼의 최강자