버전 확인
$ bin/kafka-topics.sh --version
Topic 관리
토픽 생성
$ bin/kafka-topics.sh --create --zookeeper ${zookeeper} --replication-factor 3 --partitions ${partitions} --topic ${topicName}
* 토픽을 생성할 때 deadLetter도 함께 만들어 두는 습관을 갖자. deadLetter는 토픽의 파티션과 같은 숫자여야 한다.
토픽 확인
$ bin/kafka-topics.sh --describe --zookeeper ${zookeeper} --topic ${topicName}
토픽의 파티션 갯수, replication factor, retention등의 부가 설정을 확인할 수 있다.
토픽 목록 확인
$ bin/kafka-topics.sh --list --zookeeper ${zookeeper}
설정한 토픽 목록을 확인한다.
Consumer group 관리
컨슈머 그룹 목록 확인
$ bin/kafka-consumer-groups.sh --bootstrap-server ${zookeeper} --list
컨슈머 그룹 개별 확인
$ bin/kafka-consumer-groups.sh --bootstrap-server ${zookeeper} --describe --group ${consumerGroupName}
실제로 운영을 하다 보면 가장 많이 사용하게 되는 명령어 중 하나라고 생각이 든다.
Partition 당 상태를 볼 수 있다.
HOST | 연결되어 있는 컨슈머의 host 정보를 알 수 있다. |
LOG-END-OFFSET | 마지막 offset이 어디까지인지 알려준다. |
CURRENT-OFFSET | 현재의 offset 정보를 알려준다. |
LAG | {LOG-END-OFFSET} - {CURRENT-OFFSET} 의 값 Kafka 이벤트가 얼마나 빠르게 소진이 되고 있는지 확인이 가능하다. Retry 되고 있는 상황이라면 커밋되지 않은 상태의 메시지 갯수가 남아있는 것을 확인할 수 있다. |
컨슈머 그룹 offset 변경하기
dry-run은 offset이 어디까지 돌려지게 될지 시뮬레이션을 해주고 execute를 하면 실제로 돌아간다.
consumer가 없을 때만 가능해서 구독하고 있는 application 서버를 내려야 가능하다.
- 처음으로 되돌리기
$ bin/kafka-consumer-groups.sh --bootstrap-server ${zookeeper} --group ${consumerGroupName} --topic ${topicName} --reset-offsets --to-earliest --dry-run
$ bin/kafka-consumer-groups.sh --bootstrap-server ${zookeeper} --group ${consumerGroupName} --topic ${topicName} --reset-offsets --to-earliest --execute
- 특정 시점으로 되돌리기
$ bin/kafka-consumer-groups.sh --bootstrap-server ${zookeeper} --group ${consumerGroupName} --topic ${topicName} --reset-offsets --to-datetime 2022-01-01T00:00:00Z --dry-run
$ bin/kafka-consumer-groups.sh --bootstrap-server ${zookeeper} --group ${consumerGroupName} --topic ${topicName} --reset-offsets --to-earliest 2022-01-01T00:00:00Z --execute
불필요한 컨슈머 그룹 삭제
$ bin/kafka-consumer-groups.sh --bootstrap-server ${zookeeper} --delete --group ${consumerGroupName}
이벤트 관리
토픽에서 이벤트 읽기
- 처음부터 읽기
$ bin/kafka-console-consumer.sh --bootstrap-server ${zookeeper} --topic ${topicName} --from-beginning
- 현재 인입된 것부터 읽기
$ bin/kafka-console-consumer.sh --bootstrap-server ${zookeeper} --topic ${topicName}
이벤트 전송하기
- command line에서 직접 전송
$ bin/kafka-console-producer.sh --broker-list ${zookeeper} --topic ${topicName}
> {"key":"Hello","value":"world"}
- file에서 읽기
$ bin/kafka-console-producer.sh --broker-list ${zookeeper} --topic ${topicName} < content.txt
파티션 재할당
파티션 재할당 설정 파일 체크
$ bin/kafka-reassign-partitions.sh --zookeeper ${zookeeper} --topics-to-move-json-file myConfiguration.json --broker-list "1,2,3,4,5" --generate
// myConfiguration.json
{
"topics":[{"topic": "my.awesome.topic"}],
"version": 1
}
실행하게 되면 진행 상황을 확인할 수 있다.
$ bin/kafka-reassign-partitions.sh --zookeeper ${zookeeper} --reassignment-json-file myConfiguration.json --execute
'Programming > Architecture' 카테고리의 다른 글
Pub-Sub 구조에서 자주 발생하는 실수 (1) | 2022.02.16 |
---|