您的当前位置:首页正文

json数据入库kafka

2021-02-17 来源:好走旅游网
json数据⼊库kafka

package main.scala.com.web.zhangyong168.cn.spark.java;import com.alibaba.fastjson.JSONObject;

import com.web.zhangyong168.cn.spark.util.PropertiesUtils;import org.apache.kafka.clients.admin.AdminClient;

import org.apache.kafka.clients.admin.KafkaAdminClient;import org.apache.kafka.clients.admin.NewTopic;

import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.Producer;

import org.apache.kafka.clients.producer.ProducerRecord;import org.sparkproject.guava.collect.Lists;import java.util.*;/**

@version 1.0.0@Author zhangyong

@Description json数据⼊库kafka@Date 2020/06/05 14:40**/

public class WirteKafka {/**

配置⽂件的路径

@param proUrl 配置⽂件路径

@param runModel 运⾏模式 test dev produce

@return properties*/

public static Properties getProperties(String proUrl, String runModel) {Properties props = PropertiesUtils.loadProps(\"kafka.properties\");Properties properties = new Properties();

properties.put(\"bootstrap.servers\properties.put(\"zookeeper.connect\properties.put(\"group.id\

properties.put(\"key.serializer\properties.put(\"value.serializer\return properties;}/**

获得数据结果集

@param accessArray 参数@return list*/

public static List> getResultList(AccessArray accessArray) {List> list = new ArrayList<>();

int columnNamelengths = accessArray.getColumnNames().length;for (Object[] tmpValue : accessArray.getRecordArrayValue()) {Map parameters = new LinkedHashMap<>();if (columnNamelengths == tmpValue.length) {for (int j = 0; j < columnNamelengths; j++) {

parameters.put(accessArray.getColumnName(j), tmpValue[j].toString());}}

list.add(parameters);

}

return list;}/**

添加kafak数据

@param data 数据*/

public static void insertKafkaDatas(String data) {

Properties props = getProperties(\"kafka.properties\

AdminClient create = KafkaAdminClient.create(props);//创建Topic

create.createTopics(Lists.newArrayList(new NewTopic(\"lanhuahua\Producer producer = new KafkaProducer(props);//没有key的存⼊

(new ProducerRecord(\"lanhuahua\//key 存⼊

producer.send(new ProducerRecord(\"lanhuahua\create.close();producer.close();}

public static void main(String[] args) {

String str1 = \"{\"tableName\":\"yunduo.tb_role_user\\"\"columnTypes\":[0,0,0],\" +

\"\"columnValues\":[[\"daniel\\"[\"huahua\\

AccessArray accessArray = JSONObject.parseObject(str1, AccessArray.class);System.out.println(accessArray);

List> list = getResultList(accessArray);insertKafkaDatas(str1.toString());// kafkaProducer(\"你是我的眼3\");}}

因篇幅问题不能全部显示,请点此查看更多更全内容