컴퓨터 지식 네트워크 - 컴퓨터 지식 - Kafka와 결합된 Flink

Kafka와 결합된 Flink

Flink는 Kafka 주제 데이터를 읽고 쓸 수 있는 고유한 Kafka 커넥터를 제공합니다. flink가 kafka 데이터를 사용할 때 정확히 한 번 의미 체계를 보장하기 위해 kafka 소비자 그룹의 오프셋을 추적하여 완전히 달성되는 것은 아닙니다. 대신 flink는 내부적으로 오프셋을 추적하고 체크포인트를 수행하여 정확히 한 번 의미 체계를 달성합니다.

flink Kafka와 통합되어 해당 버전의 Maven 종속성은 다음과 같습니다.

Maven 종속성 예시

1.7.0

2.11

2.11.12

<종속성>< /p >

? org.apache.flink

? flink-streaming-scala_${scala.binary.version}

? ${flink.version}

? 제공

Flink는 FlinkKafkaConsumer를 사용하여 kafka를 읽고 액세스합니다. kafka 버전에 따라 FlinkKafkaConsumer의 클래스 이름도 변경되어 FlinkKafkaConsumer가 됩니다.

[08,09,10...] 다음 숫자는 kafka 큰 버전 번호.

FlinkKafkaConsumer를 초기화하려면 다음 매개변수가 필요합니다.

1. 하나 이상의 주제에서 데이터 소비를 지정하는 데 사용되는 주제 이름

2. Kafka 구성 정보, zk 주소 포트, kafka 주소 포트 등.

3. 디시리얼라이저(스키마), 소비 데이터를 디시리얼라이즈할 디시리얼라이저를 선택합니다.

flink kafka의 소비자 측에서는 kafka의 메시지 데이터를 java 또는 scala의 객체로 역직렬화하는 방법을 알아야 합니다. DeserializationSchema를 사용하면 각 kafka 메시지가 DeserializationSchema의 eserialize(byte[] 메시지) 메서드에 따라 작동합니다. Kafka 메시지를 사용자가 원하는 구조로 변환합니다.

사용자는 사용자 정의할 수 있는 KeyedDeserializationSchema 또는 DeserializationSchema 인터페이스를 구현하여 주로 수행되는 사용자 정의 스키마를 통해 액세스 데이터를 사용자 정의 데이터 구조로 변환합니다.

Flink의 내장 DeserializationSchema 구현에는 다음이 포함됩니다.

public class SimpleStringSchema Implements DeserializationSchema

public class TypeInformationSerializationSchema Implements DeserializationSchema

KeyedDeserializationSchema 구현은 다음과 같습니다.

공용 클래스 TypeInformationKeyValueSerializationSchema 구현 KeyedDeserializationSchema>

공용 클래스 JSONKeyValueDeserializationSchema 구현 KeyedDeserializationSchema

예:

val myConsumer = new FlinkKafkaConsumer010[String]("topic",new SimpleStringSchema,p)

public class MySchema Implements KeyedDeserializationSchema {

@Override

public KafkaMsgDTO deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException {

String msg = new String(message, StandardCharsets.UTF_8);

문자열 키 = null;

if(messageKey != null){

key = new String(messageKey, StandardCharsets.UTF_8) ;

}

return new KafkaMsgDTO(msg,key,topic,partition,offset);

}

@Override

public boolean isEndOfStream(KafkaMsgDTO nextElement) {

false 반환;

}

@Override

public TypeInformation< KafkaMsgDTO> getProducedType() {

return getForClass(KafkaMsgDTO.class);

}

}

<종속성>

? org.apache.flink

? flink-connector-kafka-base_2.11

version>1.7.0

public class KafkaMsgDTO {

private String topic;

private int 파티션;

비공개 긴 오프셋;

비공개 문자열 메시지;

@Override

공개 문자열 toString() {

return "KafkaMsgDTO{" +

"topic='" + 주제 + '\'' +

", partition=" + 파티션 +

", offset=" + 오프셋 +

", mesg='" + mesg + '\'' +

", key='" + key + '\' ' +

'}';

}

개인 문자열 키;

공개 KafkaMsgDTO(){

}

public KafkaMsgDTO(String mesg,String key,String topic,int partition,long offset){

this.mesg = mesg;

this. 키 = 키;

this.topic = 주제;

this.partition = 파티션;

this.offset = 오프셋;

}

public String getKey() {

반환 키;

}

public void setKey(String key) {

this.key = 키;

}

public String getTopic() {

주제 반환;

}

public void setTopic(String topic) {

this.topic = topic;

}

public int getPartition() {

파티션 반환;

}

public void setPartition(int partition) {

this.partition = 파티션;

}

public long getOffset() {

오프셋 반환;

}

public void setOffset(long offset) {

this.offset = 오프셋;

}

public String getMesg() {

return mesg;

}

public void setMesg(String mesg) {

this.mesg = mesg;

}

}

val myConsumer = new FlinkKafkaConsumer010[KafkaMsgDTO]("topic",new MySchema(),p)

// myConsumer.setStartFromEarliest() ?

//가장 빠른 것부터 소비 시작 , 소비된 데이터는 반복적으로 소비됩니다. Kafka의 관점에서는 기본적으로 오프셋이 제출되지 않습니다.

// myConsumer.setStartFromLatest()

//다음에서 소비를 시작합니다. 최신, 흐름이 시작되기 전에 소비하지 마세요. 소비되지 않은 데이터의 경우 Kafka는 기본적으로 오프셋을 제출하지 않습니다.

? myConsumer.setStartFromGroupOffsets()

//소비된 데이터에서 소비를 시작합니다. Kafka가 오프셋을 제출했습니다. 이것이 기본 소비 방법입니다.

//체크포인트가 완료되지 않고 데이터가 싱크에 들어가면 오프셋이 제출됩니다. . 오프셋은 여전히 ​​제출되고 프로그램은 종료됩니다. 스트림이 다시 시작되면 소비되지 못한 데이터는 다시 소비되지 않습니다.

//체크포인트가 완료되면 종단간 정확합니다. 데이터 소비가 보장됩니다. 싱크의 로직이 실패하면 오프셋이 제출되지 않습니다.

env.enableCheckpointing(5000);

val stream = env.addSource(myConsumer)

stream.addSink(x =>{

? println(x)

? println(1/(x.getMesg.toInt%2))//메시지가 다음과 같은 경우 짝수이면 오류가 보고되고 분모는 0

? println(x)

})

val stream = env.addSource(myConsumer )

//실험에서는 싱크 처리가 비동기 스레드가 실패하는 경우 논리에 실행 중인 스레드가 있는 경우를 보여줍니다. 오프셋은 계속 제출됩니다.

stream.addSink(x=>{

? println(x)

? new Thread(new Runnable {

재정의 def run(): Unit = {

? println(1/(x.getMesg.toInt%2))//메시지가 짝수이면 오류가 보고되고 분모는 0입니다.

}

? }).start()

? println(x)

})

val 특정StartOffsets = new java.util.HashMap [KafkaTopicPartition, java.lang.Long]()

특정StartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L)

특정StartOffsets.put(new KafkaTopicPartition("myTopic ", 1), 31L)

특정StartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L)

myConsumer.setStartFromSpecificOffsets( 특정StartOffsets)

上篇: Cpu 모델 뒤에 U 가 있다는 게 무슨 뜻인가요? 下篇: HTC 호스트 일련번호는 SH138PY02176입니다. 이는 무엇을 의미하나요? 그리고 상하이, 우한, 신주 등 생산지 간에 차이가 있나요?
관련 내용