Springboot+阿里云kafka踩坑实录

简介:

场景描述:上文写到,不断接收数据并存放到OSS,现在要把数据存到MQ的kafka一份。


springboot版本为1.5.9。

开工之前先阅读阿里云官方kafka消息接入说明:https://help.aliyun.com/document_detail/52376.html


1、首先引入kafka jar包

189cfd06de6b670f27dd719ba9d29e90ebb47417

spring-kafka目前最新版本为2.1.2,其依赖的kafka-clients是1.0.x,但Kafka 服务端版本是 0.10,Client 版本建议 0.10,所以此处需排除依赖重新引入,否则一直报错:

Bootstrap broker kafka-ons-internet.aliyun.com:8080 disconnected


2、KafkaConfiguration.java


@Configuration
@EnableKafka
public class KafkaConfiguration {

	@Value("${kafka.broker.address}")
	private String brokerAddress;

	@Value("${kafka.default.topic}")
	private String defaultTopic;

	@Value("${kafka.jks.location}")
	private String jksLocation;

	public KafkaConfiguration() {
		URL authLocation = KafkaConfiguration.class.getClassLoader().getResource("kafka_client_jaas.conf");
		if (System.getProperty("java.security.auth.login.config") == null) {
			System.setProperty("java.security.auth.login.config", authLocation.toExternalForm());
		}
	}

	public Map<String, Object> producerConfigs() {
		Map<String, Object> props = new HashMap<String, Object>();
		props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerAddress);
		if (StringUtils.isEmpty(jksLocation)) {
			props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, KafkaConfiguration.class.getClassLoader().getResource("kafka.client.truststore.jks").getPath());
		} else {
			props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, jksLocation);
		}
		props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
		props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
		props.put(SaslConfigs.SASL_MECHANISM, "ONS");
		props.put(ProducerConfig.RETRIES_CONFIG, 0);
		props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
		props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
		props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30 * 1000);
		return props;
	}

	public ProducerFactory<String, String> producerFactory() {
		return new DefaultKafkaProducerFactory<String, String>(producerConfigs());
	}

	@Bean
	public KafkaTemplate<String, String> kafkaTemplate() {
		KafkaTemplate kafkaTemplate = new KafkaTemplate<String, String>(producerFactory());
		kafkaTemplate.setDefaultTopic(defaultTopic);
		return kafkaTemplate;
	}
}

注意此处定义了三个变量,通过配置文件注入:

brokerAddress kafka服务器地址

defaultTopic kafka默认topic

jksLocation JKS文件地址(开发环境无需定义,直接读取resources下的jks,但生产环境需读取jar包外部的jks文件,所以此处需配置路径)

03335a178aff1e00d0a797f8994ee46478548945


3、将《kafka消息接入说明》中的kafka_client_jaas.conf和根证书kafka.client.truststore.jks放到resources/目录下

48b2e332f3f9fa48d929fac0f9dc464e66fd45ca


4、KafkaService.java



@Component
public class KafkaService {

	@Autowired
	private KafkaTemplate<String, String> kafkaTemplate;

	public void send(ProbeData probeData) {
		String deviceMac = probeData.getDev_mac();
		String shopId = probeData.getShopId();
		List<UserData> data = probeData.getData();

		StringBuilder msgSB = new StringBuilder();
		if (data != null && !data.isEmpty()) {
			for (UserData userData : data) {
				String visitorMac = userData.getUsr_mac();
				String visitorTime = userData.getUsr_cap_time();
				String msg = shopId + "," + deviceMac + "," + visitorMac + "," + visitorTime;

				msgSB.append(msg).append(";");
			}
			ListenableFuture futher = kafkaTemplate.sendDefault(msgSB.toString());
			/*
				// 此处用于控制是否同步
				try {
					futher.get();
				} catch (InterruptedException e) {
					e.printStackTrace();
				} catch (ExecutionException e) {
					e.printStackTrace();
				}
			*/
		}
	}
}


关键代码就一句:

kafkaTemplate.sendDefault(msgSB.toString());


此处为异步发送,一开始用测试类测试时总是写入不了kafka,后来发现是因为公网异步写入太慢,而测试类执行完后退出,导致异步中断。

测试时可以改为同步发送,即:

ListenableFuture futher = kafkaTemplate.sendDefault(msgSB.toString());
futher.get();




目录
相关文章
|
11天前
|
消息中间件 Java Kafka
Springboot集成高低版本kafka
Springboot集成高低版本kafka
|
18天前
|
消息中间件 NoSQL Kafka
云原生最佳实践系列 5:基于函数计算 FC 实现阿里云 Kafka 消息内容控制 MongoDB DML 操作
该方案描述了一个大数据ETL流程,其中阿里云Kafka消息根据内容触发函数计算(FC)函数,执行针对MongoDB的增、删、改操作。
|
1月前
|
算法 Java 开发工具
使用阿里云KMS产品针对 Springboot 接口参数加密解密功能
针对Springboot里面使用开源工具使用加解密,替换成阿里云KMS产品进行加解密;
121 1
|
1月前
|
消息中间件 Cloud Native Kafka
活动报名|AutoMQ x 阿里云云原生创新论坛(2024.03.09)见证“新一代云原生 Kafka ”重磅发布!
新一年, AutoMQ 首场线下活动重磅来袭!2024年3月9日,由 AutoMQ 与阿里云联合举办的云原生创新论坛将于杭州与大家见面,双方联合重磅发布新一代云原生 Kafka ——AutoMQ On-Prem 版本 !现场将会分享如何通过云原生和存算分离架构实现 Kafka 产品的10倍成本优化,并保持秒级分区无损迁移。另外,活动现场还有来自得物的技术专家分享 AutoMQ 在生产场景中的应用实践,以及阿里云的资深专家为大家剖析多 AZ 块存储的原理。
120 0
活动报名|AutoMQ x 阿里云云原生创新论坛(2024.03.09)见证“新一代云原生 Kafka ”重磅发布!
|
2月前
|
消息中间件 SQL druid
最新版 springboot集成kafka
最新版 springboot集成kafka
24 0
|
2月前
|
对象存储
阿里云oss-cloud-sdk-springboot3兼容问题
阿里云oss-cloud-sdk-springboot3兼容问题
70 0
|
3月前
|
消息中间件 SQL Java
阿里云Flink-自定义kafka sink partitioner实践及相关踩坑记录
阿里云Flink-自定义kafka sink partitioner实践及相关踩坑记录
|
3月前
|
存储 安全 Java
springboot把图片上传到阿里云OSS
springboot把图片上传到阿里云OSS
190 0
|
25天前
|
Ubuntu JavaScript 关系型数据库
在阿里云Ubuntu 20.04服务器中搭建一个 Ghost 博客
在阿里云Ubuntu 20.04服务器上部署Ghost博客的步骤包括创建新用户、安装Nginx、MySQL和Node.js 18.x。首先,通过`adduser`命令创建非root用户,然后安装Nginx和MySQL。接着,设置Node.js环境,下载Nodesource GPG密钥并安装Node.js 18.x。之后,使用`npm`安装Ghost-CLI,创建Ghost安装目录并进行安装。配置过程中需提供博客URL、数据库连接信息等。最后,测试访问前台首页和后台管理页面。确保DNS设置正确,并根据提示完成Ghost博客的配置。
在阿里云Ubuntu 20.04服务器中搭建一个 Ghost 博客
|
29天前
|
存储 分布式计算 网络协议
阿里云服务器内存型r7、r8a、r8y实例区别参考
在阿里云目前的活动中,属于内存型实例规格的云服务器有内存型r7、内存型r8a、内存型r8y这几个实例规格,相比于活动内的经济型e、通用算力型u1实例来说,这些实例规格等性能更强,与计算型和通用型相比,它的内存更大,因此这些内存型实例规格主要适用于数据库、中间件和数据分析与挖掘,Hadoop、Spark集群等场景,本文为大家介绍内存型r7、r8a、r8y实例区别及最新活动价格,以供参考。
阿里云服务器内存型r7、r8a、r8y实例区别参考

热门文章

最新文章