Spring boot 1.5.1

2.24.1. 安装 kafka

一下安装仅仅适合开发环境,生产环境请使用这个脚本安装 https://github.com/oscm/shell/tree/master/mq/kafka

cd /usr/local/src
wget http://apache.communilink.net/kafka/0.10.2.0/kafka_2.12-0.10.2.0.tgz
tar zxvf kafka_2.12-0.10.2.0.tgz
mv kafka_2.12-0.10.2.0 /srv/
cp /srv/kafka_2.12-0.10.2.0/config/server.properties{,.original}
echo "advertised.host.name=localhost" >> /srv/kafka_2.12-0.10.2.0/config/server.properties
ln -s /srv/kafka_2.12-0.10.2.0 /srv/kafka       

启动 Kafka 服务

/srv/kafka/bin/zookeeper-server-start.sh config/zookeeper.properties
/srv/kafka/bin/kafka-server-start.sh /srv/kafka/config/server.properties        

-daemon 表示守护进程方式在后台启动

/srv/kafka/bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
/srv/kafka/bin/kafka-server-start.sh -daemon /srv/kafka/config/server.properties

停止 Kafka 服务

/srv/kafka/bin/kafka-server-stop.sh
/srv/kafka/bin/zookeeper-server-stop.sh

2.24.2. maven

      <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>          
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>cn.netkiller</groupId><artifactId>deploy</artifactId><version>0.0.1-SNAPSHOT</version><packaging>war</packaging><name>deploy.netkiller.cn</name><description>Deploy project for Spring Boot</description><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>1.5.1.RELEASE</version><relativePath /> <!-- lookup parent from repository --></parent><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><java.version>1.8</java.version></properties><dependencies><!-- <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> --><!-- <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency> --><!-- <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-mongodb</artifactId> </dependency> --><!-- <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><dependency><groupId>org.springframework.session</groupId><artifactId>spring-session-data-redis</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-cache</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-security</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency><dependency><groupId>org.webjars</groupId><artifactId>webjars-locator</artifactId></dependency><dependency><groupId>org.webjars</groupId><artifactId>sockjs-client</artifactId><version>1.0.2</version></dependency><dependency><groupId>org.webjars</groupId><artifactId>stomp-websocket</artifactId><version>2.3.3</version></dependency><dependency><groupId>org.webjars</groupId><artifactId>bootstrap</artifactId><version>3.3.7</version></dependency><dependency><groupId>org.webjars</groupId><artifactId>jquery</artifactId><version>3.1.0</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-mail</artifactId></dependency><dependency><groupId>org.apache.tomcat.embed</groupId><artifactId>tomcat-embed-jasper</artifactId><scope>provided</scope></dependency><dependency><groupId>javax.servlet</groupId><artifactId>jstl</artifactId></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId></dependency><dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId><!-- <version>2.7</version> --></dependency><dependency><groupId>com.caucho</groupId><artifactId>hessian</artifactId><version>4.0.38</version></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><mainClass>cn.netkiller.Application</mainClass></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-surefire-plugin</artifactId><configuration><skip>true</skip></configuration></plugin></plugins></build><repositories><repository><id>spring-snapshots</id><name>Spring Snapshots</name><url>https://repo.spring.io/snapshot</url><snapshots><enabled>true</enabled></snapshots></repository><repository><id>spring-milestones</id><name>Spring Milestones</name><url>https://repo.spring.io/milestone</url><snapshots><enabled>false</enabled></snapshots></repository></repositories><pluginRepositories><pluginRepository><id>spring-snapshots</id><name>Spring Snapshots</name><url>https://repo.spring.io/snapshot</url><snapshots><enabled>true</enabled></snapshots></pluginRepository><pluginRepository><id>spring-milestones</id><name>Spring Milestones</name><url>https://repo.spring.io/milestone</url><snapshots><enabled>false</enabled></snapshots></pluginRepository></pluginRepositories></project>

2.24.3. Spring boot Application

package cn.netkiller;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.scheduling.annotation.EnableScheduling;@SpringBootApplication
@EnableAutoConfiguration
@ComponentScan
@EnableScheduling
public class Application {public static void main(String[] args) {SpringApplication.run(Application.class, args);}
}

2.24.4. EnableKafka

package cn.netkiller.kafka;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;import java.util.HashMap;
import java.util.Map;@Configuration
@EnableKafka
public class KafkaConsumerConfig {public KafkaConsumerConfig() {// TODO Auto-generated constructor stub}@BeanKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();factory.setConsumerFactory(consumerFactory());// factory.setConcurrency(1);// factory.getContainerProperties().setPollTimeout(3000);return factory;}@Beanpublic ConsumerFactory<String, String> consumerFactory() {return new DefaultKafkaConsumerFactory<String, String>(consumerConfigs());}@Beanpublic Map<String, Object> consumerConfigs() {Map<String, Object> propsMap = new HashMap<String, Object>();propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "www.netkiller.cn:9092");propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return propsMap;}@Beanpublic Listener listener() {return new Listener();}}

2.24.5. KafkaListener

package cn.netkiller.kafka;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import java.util.concurrent.CountDownLatch;
import java.util.logging.Logger;public class Listener {public Listener() {// TODO Auto-generated constructor stub}protected Logger logger = Logger.getLogger(Listener.class.getName());public CountDownLatch getCountDownLatch1() {return countDownLatch1;}private CountDownLatch countDownLatch1 = new CountDownLatch(1);@KafkaListener(topics = "test")public void listen(ConsumerRecord<?, ?> record) {logger.info("Received message: " + record.toString());System.out.println("Received message: " + record);countDownLatch1.countDown();}
}           

2.24.6. 测试

$ cd /srv/kafka
$ bin/kafka-console-producer.sh --broker-list 47.89.35.55:9092 --topic test
This is test message.

每输入一行回车后发送到你的Spring boot kafka 程序

2.24.7. 完整的发布订阅实例

上面的例子仅仅是做了一个热身,现在我们将实现 一个完整的例子。

2.24.7.1. Consumer

例 2.5. Spring boot with Apache kafka.

SpringApplication

package cn.netkiller;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;
//import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
//import org.springframework.data.mongodb.repository.config.EnableMongoRepositories;
import org.springframework.scheduling.annotation.EnableScheduling;@SpringBootApplication
@EnableAutoConfiguration
@ComponentScan
// @EnableMongoRepositories
// @EnableJpaRepositories
@EnableScheduling
public class Application {public static void main(String[] args) {SpringApplication.run(Application.class, args);}
}               

Consumer configuration

package cn.netkiller.kafka.config;import java.util.HashMap;
import java.util.Map;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;import cn.netkiller.kafka.consumer.Consumer;@Configuration
@EnableKafka
public class ConsumerConfiguration {public ConsumerConfiguration() {// TODO Auto-generated constructor stub}@Beanpublic Map<String, Object> consumerConfigs() {HashMap<String, Object> props = new HashMap<>();// list of host:port pairs used for establishing the initial connections// to the Kakfa clusterprops.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "www.netkiller.cn:9092");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);// consumer groups allow a pool of processes to divide the work of// consuming and processing recordsprops.put(ConsumerConfig.GROUP_ID_CONFIG, "helloworld");return props;}@Beanpublic ConsumerFactory<String, String> consumerFactory() {return new DefaultKafkaConsumerFactory<String, String>(consumerConfigs());}@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();factory.setConsumerFactory(consumerFactory());return factory;}@Beanpublic Consumer receiver() {return new Consumer();}
}

Consumer

package cn.netkiller.kafka.consumer;
import java.util.concurrent.CountDownLatch;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;public class Consumer {public Consumer() {// TODO Auto-generated constructor stub}private static final Logger logger = LoggerFactory.getLogger(Consumer.class);private CountDownLatch latch = new CountDownLatch(1);@KafkaListener(topics = "helloworld.t")public void receiveMessage(String message) {logger.info("received message='{}'", message);latch.countDown();}public CountDownLatch getLatch() {return latch;}
}

2.24.7.2. Producer

例 2.6. Spring boot with Apache kafka.

Producer configuration

package cn.netkiller.kafka.config;import java.util.HashMap;
import java.util.Map;import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;import cn.netkiller.kafka.producer.Producer;@Configuration
public class ProducerConfiguration {public ProducerConfiguration() {// TODO Auto-generated constructor stub}@Beanpublic Map<String, Object> producerConfigs() {HashMap<String, Object> props = new HashMap<>();// list of host:port pairs used for establishing the initial connections// to the Kakfa clusterprops.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "www.netkiller.cn:9092");props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);// value to block, after which it will throw a TimeoutExceptionprops.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5000);return props;}@Beanpublic ProducerFactory<String, String> producerFactory() {return new DefaultKafkaProducerFactory<String, String>(producerConfigs());}@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate<String, String>(producerFactory());}@Beanpublic Producer sender() {return new Producer();}
}

Producer

package cn.netkiller.kafka.producer;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;public class Producer {private static final Logger logger = LoggerFactory.getLogger(Producer.class);/** public Sender() { // TODO Auto-generated constructor stub }*/@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void sendMessage(String topic, String message) {// the KafkaTemplate provides asynchronous send methods returning a// FutureListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);// you can register a callback with the listener to receive the result// of the send asynchronouslyfuture.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {@Overridepublic void onSuccess(SendResult<String, String> result) {logger.info("sent message='{}' with offset={}", message, result.getRecordMetadata().offset());}@Overridepublic void onFailure(Throwable ex) {logger.error("unable to send message='{}'", message, ex);}});// alternatively, to block the sending thread, to await the result,// invoke the future’s get() method}
}               

Controller

package cn.netkiller.web;import java.util.concurrent.TimeUnit;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;import cn.netkiller.kafka.consumer.Consumer;
import cn.netkiller.kafka.producer.Producer;@Controller
@RequestMapping("/test")
public class KafkaTestController {private static final Logger logger = LoggerFactory.getLogger(IndexController.class);public KafkaTestController() {// TODO Auto-generated constructor stub}@Autowiredprivate Producer sender;@Autowiredprivate Consumer receiver;@RequestMapping("/ping")@ResponseBodypublic String ping() {String message = "PONG";return message;}@RequestMapping("/kafka/send")@ResponseBodypublic String testReceiver() throws Exception {sender.sendMessage("helloworld.t", "Hello Spring Kafka!");receiver.getLatch().await(10000, TimeUnit.MILLISECONDS);logger.info(receiver.getLatch().getCount() + "");return "OK";}}                

2.24.7.3. Test

例 2.7. Test Spring Kafka

SpringBootTest

package cn.netkiller;
import static org.assertj.core.api.Assertions.assertThat;import java.util.concurrent.TimeUnit;import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;import cn.netkiller.kafka.consumer.Consumer;
import cn.netkiller.kafka.producer.Producer;@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringKafkaApplicationTests {public SpringKafkaApplicationTests() {// TODO Auto-generated constructor stub}@Autowiredprivate Producer sender;@Autowiredprivate Consumer receiver;@Testpublic void testReceiver() throws Exception {sender.sendMessage("helloworld.t", "Hello Spring Kafka!");receiver.getLatch().await(10000, TimeUnit.MILLISECONDS);assertThat(receiver.getLatch().getCount()).isEqualTo(0);}
}

                

2.24.8. Spring cloud with Kafka

2.24.8.1. Application 主文件

package schedule;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.domain.EntityScan;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
import org.springframework.scheduling.annotation.EnableScheduling;@SpringBootApplication
@EnableScheduling
@EnableEurekaClient
@EntityScan("common.domain")
public class Application {public static void main(String[] args) {System.out.println("Service Schedule Starting...");SpringApplication.run(Application.class, args);}
}           

2.24.8.2. 资源配置文件

2.24.8.2.1. application.properties

只需要两行,其余所有配置均放在配置中心。

# ==============================
spring.application.name=schedule
eureka.client.serviceUrl.defaultZone=http://eureka:s3cr3t@172.16.0.10:8761/eureka/
# ==============================

2.24.8.2.2. bootstrap.properties

配置中心服务器相关配置

#spring.application.name=schedule
spring.cloud.config.profile=development
spring.cloud.config.label=master
spring.cloud.config.uri=http://172.16.0.10:8888
management.security.enabled=false
spring.cloud.config.username=cfg
spring.cloud.config.password=s3cr3t

2.24.8.2.3. Git 仓库

在 git 仓库中加入 spring.kafka.bootstrap_servers 配置项

spring.kafka.bootstrap_servers=172.16.0.10:9092

2.24.8.3. 启用 kafka

使用 @EnableKafka 启用 Kafka 不需要其他@Bean等。这个配置文件可以省略,可以将 @EnableKafka 放到 Application.java 中。我还是喜欢独立配置。

package schedule.config;
@Configuration
@EnableKafka
public class KafkaConfiguration {
}

2.24.8.4. 消息发布主程序

package schedule.task;import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import java.util.List;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.client.RestTemplate;import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;import schedule.repository.CmsTrashRepository;
import schedule.repository.ArticleRepository;
import common.domain.Article;
import common.domain.CmsTrash;
import common.pojo.ResponseRestful;@Component
public class CFPushTasks {private static final Logger logger = LoggerFactory.getLogger(CFPushTasks.class);private static final String TOPIC = "test";private static final SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");private static final ObjectMapper mapper = new ObjectMapper();@Autowiredprivate ArticleRepository articleRepository;@Autowiredprivate CmsTrashRepository cmsTrashRepository;@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@Autowiredprivate RedisTemplate<String, String> redisTemplate;@Value("${cf.cms.site_id}")private int siteId;public CFPushTasks() {}private Date getDate() {Calendar calendar = Calendar.getInstance();calendar.add(Calendar.MINUTE, -1);Date date = calendar.getTime();return date;}private boolean setPostionDate(String key, Date value) {String cacheKey = String.format("schedule:CFPushTasks:%s", key);String date = simpleDateFormat.format(value);logger.info("setPostion({},{})", cacheKey, date);redisTemplate.opsForValue().set(cacheKey, date);if (value == this.getPostionDate(cacheKey)) {return true;}return false;}private Date getPostionDate(String key) {String cacheKey = String.format("schedule:CFPushTasks:%s", key);Date date = null;if (redisTemplate.hasKey(cacheKey)) {try {date = simpleDateFormat.parse(redisTemplate.opsForValue().get(cacheKey));} catch (ParseException e) {// TODO Auto-generated catch block// e.printStackTrace();logger.warn(e.getMessage());}}logger.debug("getPostion({}) => {}", cacheKey, date);return date;}private boolean setPostionId(String key, int id) {String cacheKey = String.format("schedule:CFPushTasks:PostionId:%s", key);logger.info("setPostionId({},{})", cacheKey, id);redisTemplate.opsForValue().set(cacheKey, String.valueOf(id));if (id == this.getPostionId(cacheKey)) {return true;}return false;}private int getPostionId(String key) {String cacheKey = String.format("schedule:CFPushTasks:PostionId:%s", key);int id = 0;if (redisTemplate.hasKey(cacheKey)) {id = Integer.valueOf(redisTemplate.opsForValue().get(cacheKey));}logger.debug("getPostion({}) => {}", cacheKey, id);return id;}@Scheduled(fixedRate = 1000 * 50)public void insert() {Iterable<Article> articles = null;int id = this.getPostionId("insert");if (id == 0) {articles = articleRepository.findBySiteId(this.siteId);} else {articles = articleRepository.findBySiteIdAndIdGreaterThan(this.siteId, id);}if (articles != null) {for (Article article : articles) {ResponseRestful responseRestful = new ResponseRestful(true, this.getPostionId("insert"), "INSERT", article);String jsonString;try {jsonString = mapper.writeValueAsString(responseRestful);this.send(TOPIC, jsonString);if (!this.setPostionId("insert", article.getId())) {return;}} catch (JsonProcessingException e) {// TODO Auto-generated catch blocke.printStackTrace();}}}}@Scheduled(fixedRate = 1000 * 50)public void update() {String message = "Hello";this.send(TOPIC, message);}@Scheduled(fixedRate = 1000 * 50)public void delete() {Date date = this.getPostionDate("delete");Iterable<CmsTrash> cmsTrashs;if (date == null) {cmsTrashs = cmsTrashRepository.findBySiteIdAndTypeOrderByCtime(this.siteId, "delete");} else {cmsTrashs = cmsTrashRepository.findBySiteIdAndTypeAndCtimeGreaterThanOrderByCtime(this.siteId, "delete", date);}if (cmsTrashs != null) {for (CmsTrash cmsTrash : cmsTrashs) {ResponseRestful responseRestful = new ResponseRestful(true, this.getPostionId("delete"), "DELETE", cmsTrash);String jsonString;try {jsonString = mapper.writeValueAsString(responseRestful);this.send(TOPIC, jsonString);this.setPostionId("delete", cmsTrash.getId());if (!this.setPostionDate("delete", cmsTrash.getCtime())) {return;} else {}} catch (JsonProcessingException e) {// TODO Auto-generated catch blocke.printStackTrace();}}}}private void send(String topic, String message) {ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {@Overridepublic void onSuccess(SendResult<String, String> result) {logger.debug("sent message='{}' with offset={}", message, result.getRecordMetadata().offset());}@Overridepublic void onFailure(Throwable ex) {logger.error("unable to send message='{}'", message, ex);}});}private void post(ResponseRestful responseRestful) {RestTemplate restTemplate = new RestTemplate();String response = restTemplate.postForObject("http://localhost:8440/test/cf/post.json", responseRestful, String.class);// logger.info(article.toString());if (response != null) {logger.info(response);}}
}

原文出处:Netkiller 系列 手札
本文作者:陈景峯
转载请与作者联系,同时请务必标明文章原始出处和作者信息及本声明。

2.24. Spring boot with Apache Kafka相关推荐

  1. Spring Boot 和Apache Kafka的集成

    点击上方蓝色"方志朋",选择"设为星标" 回复"666"获取独家整理的学习资料! 1. 引言 Apache Kafka 是一个分布式的.容错 ...

  2. 基于Spring Boot应用Apache CXF发布Web Services服务

    记录:298 场景:使用Spring Boot应用Apache CXF发布Web Services服务,实现跨系统之间交互接口. 版本: JDK 1.8 Spring Boot 2.6.3 Apach ...

  3. kafka专题:kafka单机和集群安装详情,Spring Boot如何整合Kafka

    文章目录 1. kafka单机安装 1.1 server.properties核心配置 2. kafka集群安装 2.1 kafka集群可视化界面 3. springboot如何整合kafka 4. ...

  4. Spring Boot和Apache Camel

    随着软件世界的发展,正在开发更加复杂的系统,这些系统必须相互集成. 它从SOA开始,然后一直到微服务. 骆驼是我想到的第一大集成工具,因为如今的骆驼springboot是一个非常强大的组合. 第一步是 ...

  5. 24. Spring Boot 事务的使用

    转自:https://blog.csdn.net/catoop/article/details/50595702 转载于:https://www.cnblogs.com/sharpest/p/8004 ...

  6. 2. Spring Boot使用Apache Curator实现分布式锁(可重入排它锁)「第四章 ZooKeeper Curator应用场景实战」「架构之路ZooKeeper理论和实战」

    相关历史文章(阅读本文前,您可能需要先看下之前的系列

  7. 3.Spring Boot使用Apache Curator实现leader选举「第四章 ZooKeeper Curator应用场景实战」「架构之路ZooKeeper理论和实战」

    相关历史文章(阅读本文前,您可能需要先看下之前的系列

  8. 面试那点小事,你从未见过的spring boot面试集锦(附详细答案)

    一, 什么是spring boot? 多年来,随着新功能的增加,spring变得越来越复杂.只需访问页面https://spring.io/projects,我们将看到所有在应用程序中使用的不同功能的 ...

  9. Kafka 入门和 Spring Boot 集成

    2019独角兽企业重金招聘Python工程师标准>>> Kafka 入门和 Spring Boot 集成 概述 kafka 是一个高性能的消息队列,也是一个分布式流处理平台(这里的流 ...

最新文章

  1. intellij idea 和 myeclipse 转换
  2. Property属性,amp;nbsp;KVC键值编码OC…
  3. Objective-C中一种消息处理方法performSelector: withObject:
  4. 【mysql学习】疑问点记录
  5. android wifi驱动_OTT盒子WiFi方案首选:博通2T2R WiFi模块
  6. SpringBoot 2.x 集成 Redis
  7. NetBeans Weekly News 刊号 # 27 - Sep 24, 2008
  8. -webkit-min-device-pixel-ratio的常见值对照
  9. CVE-2021-21871: PowerISO 内存越界写漏洞
  10. python模板引擎传迭代器_python之路 模块,序列化,迭代器,生成器
  11. as3位图绘制器(矢量器):as3potrace
  12. Python成长笔记 - 基础篇 (七)python面向对象
  13. [ALAPI]免费聚合视频无水印接口分享
  14. 介绍一个很好的英语学习软件——单词风暴
  15. CHD安装Hadoop
  16. 手机QQ浏览器的HTML管理器,手机qq浏览器中文件管理器有哪些功能
  17. 话说丢帧率系列---帧间隙
  18. JAVA实现2048小游戏
  19. JMS入门(一)--JMS基础
  20. 第一次ACM校赛_记录

热门文章

  1. ForkJoinPool 偷任务
  2. 2022春节行为经济学
  3. JS执行Promise
  4. Laravel 中使用事务
  5. Spring框架是如何判断是否是上传文件请求呢
  6. Spring基于注解的方式二
  7. canal+Kafka实现mysql与redis数据同步
  8. Go 的 Contex 是线程安全的吗?
  9. 假如给Go语言加上注解,程序会变怎样?
  10. OSX EI Captain中安装Pear等三方软件不成功的解决方法