博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
kafka单线程
阅读量:3522 次
发布时间:2019-05-20

本文共 7828 字,大约阅读时间需要 26 分钟。

package com.zkdj.message.common;/** * 公共常量 * @author */public class Constants {		/**	 * 配置文件属性	 * @author 	 */	public static class Pks{		//kafka地址信息		public static final String KAFKA_SERVERS = "kafka.bootstrap.servers";		public static final String KAFKA_CONSUMER_GROUP_ID = "kafka.consumer.group.id";		public static final String KAFKA_TOPIC = "kafka.topic";			}	}
package com.zkdj.kafka.common.config;import java.io.IOException;import java.io.InputStream;import java.util.Properties;import org.apache.log4j.Logger;import com.zkdj.kafka.utils.EmptyUtils;public class ConfigManager {	private static final Logger _log = Logger.getRootLogger();	private static final String DEFAULT_PATH = "config.properties";	private static ConfigManager instance;		private Properties config; 		private ConfigManager(){		config = new Properties();		config(null);	}		public static ConfigManager getInstance(){		if(instance == null){			instance = new ConfigManager();		}		return instance;	}		/**	 * 读取配置文件	 * @param path 配置文件classpath路径下相对地址	 * @return	 */	public void config(String path){		path = EmptyUtils.isEmpty(path) ? DEFAULT_PATH : path;		InputStream in = null;		try {			Properties tmp = new Properties();			in = ConfigManager.class.getClassLoader().getResourceAsStream(path);			tmp.load(in);			config = tmp;		} catch (Exception e) {			_log.error("load config file [" + path + "] error -> " + e);		} finally {			if (in != null) {				try {					in.close();				} catch (IOException e) {					_log.error("close inStream error");				}			}		}	}		/**	 * 读取配置文件	 * @param key 配置信息键	 * @return 配置信息值	 */	public String get(String key){		return config.getProperty(key);	}}
package com.zkdj.kafka.common.config;import java.util.HashMap;import java.util.Map;import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.clients.producer.ProducerConfig;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;import org.springframework.kafka.config.KafkaListenerContainerFactory;import org.springframework.kafka.core.ConsumerFactory;import org.springframework.kafka.core.DefaultKafkaConsumerFactory;import org.springframework.kafka.core.DefaultKafkaProducerFactory;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.kafka.core.ProducerFactory;import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;import org.springframework.kafka.listener.ContainerProperties;import com.zkdj.kafka.common.config.Constants.Pks;/** * @Description: kafka配置类 * @Author:XJ */@Configurationpublic class KafKaConfig {	protected static final ConfigManager config = ConfigManager.getInstance();             /**     * @Description: 生产者的配置     * @Author:     * @return     */    public Map
producerConfigs() { Map
props = new HashMap
(); // 集群的服务器地址 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.get(Pks.KAFKA_SERVERS)); // 消息缓存 props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 40960); // 生产者空间不足时,send()被阻塞的时间,默认60s props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 6000); // 生产者重试次数 props.put(ProducerConfig.RETRIES_CONFIG, 0); // 指定ProducerBatch(消息累加器中BufferPool中的)可复用大小 props.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096); // 生产者会在ProducerBatch被填满或者等待超过LINGER_MS_CONFIG时发送 props.put(ProducerConfig.LINGER_MS_CONFIG, 1); // key 和 value 的序列化 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); // 客户端id props.put(ProducerConfig.CLIENT_ID_CONFIG, "producer.client.id.topinfo"); return props; } /** * @Description: 生产者工厂 * @Author: * @return */ @Bean public ProducerFactory
producerFactory() { return new DefaultKafkaProducerFactory
(producerConfigs()); } /** * @Description: KafkaTemplate * @Author:SXJ * @return */ @Bean public KafkaTemplate
kafkaTemplate() { return new KafkaTemplate
(producerFactory()); } // ------------------------------------------------------------------------------------------------------------ /** * @Description: 消费者配置 * @Author: * @return */ public Map
consumerConfigs() { Map
props = new HashMap
(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.get(Pks.KAFKA_SERVERS)); // 消费者组 props.put(ConsumerConfig.GROUP_ID_CONFIG, config.get(Pks.KAFKA_CONSUMER_GROUP_ID)); // 自动位移提交 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 自动位移提交间隔时间 props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100); // 消费组失效超时时间 props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000); // 位移丢失和位移越界后的恢复起始位置 props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // key 和 value 的反序列化 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");// props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,10); return props; } /** * @Description: 消费者工厂 * @Author: * @return */ @Bean public ConsumerFactory
consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } /** * @Description: kafka 监听容器工厂 * @Author: * @return */ @Bean public KafkaListenerContainerFactory
>kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory
factory = new ConcurrentKafkaListenerContainerFactory<>(); // 设置消费者工厂 factory.setConsumerFactory(consumerFactory());;// 关闭ack自动提交偏移 return factory; }}
package com.zkdj.kafka.controller.kafka;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.kafka.support.SendResult;import org.springframework.util.concurrent.ListenableFuture;import org.springframework.util.concurrent.ListenableFutureCallback;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;@RestController@RequestMapping("/kafka")	public class KafkaProducer {	    @Autowired	    private KafkaTemplate
kafkaTemplate; @RequestMapping("send") public String send(String name) { ListenableFuture
> future = kafkaTemplate.send("topinfo", name); future.addCallback(new ListenableFutureCallback
>() { @Override public void onSuccess(SendResult
result) {// System.out.println("生产者-发送消息成功:" + result.toString()); } @Override public void onFailure(Throwable ex) { System.out.println("生产者-发送消息失败:" + ex.getMessage()); } }); return "test-ok"; } }
package com.zkdj.kafka.controller.kafka;import java.util.List;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.kafka.support.Acknowledgment;import org.springframework.stereotype.Component;/** * @Description: kafka消费者 * @Author:sxj */@Componentpublic class KafkaConsumer {    private final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);        /**     * @Description: 可以同时订阅多主题,只需按数组格式即可,也就是用“,”隔开     * @param record     */    @KafkaListener(topics = { "topinfo" })  public void receive(ConsumerRecord
record) { logger.info("消费得到的消息---key: " + record.key()); logger.info("消费得到的消息---value: " + record.value().toString()); }}

 

转载地址:http://syrqj.baihongyu.com/

你可能感兴趣的文章
JavaWeb 使用Cookie实现——显示用户上次访问时间(教材学习笔记)
查看>>
Omap138开发板下以uboot2012.04.01为例分析uboot执行(五)
查看>>
Omap138开发板下以uboot2012.04.01为例分析uboot执行(六)
查看>>
Omap138开发板下以uboot2012.04.01为例分析uboot执行(七)
查看>>
Omap138开发板下以uboot2012.04.01为例分析uboot执行(八)
查看>>
中国大学MOOC—陆军工程大学数据结构MOOC习题集(2018秋)7-3 中位数
查看>>
Java发送邮件 注册成功发送邮件
查看>>
Mybatis的简单使用(增删改查),解决数据库字段名和实体类映射属性名不一致的问题
查看>>
Mybatis配置log4j文件 分页查询(limit,rowBounds)
查看>>
Mysql利用注解进行开发
查看>>
Mybatis一对多查询,多对一查询
查看>>
Spring配置bean.xml文件的头目录模板
查看>>
代理模式之------动态代理
查看>>
Spring实现AOP的三种方式
查看>>
Mybatis-Spring简单的配置和使用,配置事务
查看>>
SpringMVC和Mybatis整合使用的配置文件
查看>>
代码特效插件pycharm
查看>>
python实现tcp客户端从服务端下载文件
查看>>
将字符串 “k:1|k1:2|k2:3|k3:4” 转换成字典{“k”:1,”k1”:2,”k2”:3,”k3”:4}
查看>>
AttributeError: 'tuple' object has no attribute 'decode'
查看>>