1.31.Flink自定义rocketmq(source/sink)+自定义redis+自定义
1.31.1.工程结构
1.31.2.定义pom.xml文件
1.31.3.log4j2.properties
1.31.4.logback.xml
1.31.5.cache.properties
1.31.6.project-config.properties
1.31.7.IssueAcceptSimpleProducer.java
1.31.8.Consumer.java
1.31.9.DefaultTopicSelector.java
1.31.10.SimpleTopicSelector.java
1.31.11.TopicSelector.java
1.31.12.KeyValueDeserializationSchema.java
1.31.13.KeyValueSerializationSchema.java
1.31.14.SimpleKeyValueDeserializationSchema.java
1.31.15.SimpleKeyValueSerializationSchema.java
1.31.16.RocketMQConfig.java
1.31.17.RocketMQSink.java
1.31.18.RocketMQSource.java
1.31.19.RocketMQUtils.java
1.31.20.RunningChecker.java
1.31.21.DateUtils.java
1.31.22.PropertiesUtils.java
1.31.23.RedisUtil.java
1.31.24.IssueConstants.java
1.31.25.IssueAcceptRedisSink.java
1.31.26.IssueAcceptFlinkHandlerByCustomRedisSink.java
1.32.Flink其它案例
1.32.1.使用DataGen生成数据
1.32.2.使用value state进行存储临时数据

1.31.Flink自定义rocketmq(source/sink)+自定义redis+自定义

1.31.1.工程结构

1.31.2.定义pom.xml文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><!-- xxxxxx实时处理 --><modelVersion>4.0.0</modelVersion><groupId>xxx.xxx.xxxx</groupId><artifactId>indicators-real-time-handler</artifactId><version>1.0-SNAPSHOT</version><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><!--maven properties --><maven.test.skip>true</maven.test.skip><maven.javadoc.skip>true</maven.javadoc.skip><!-- compiler settings properties --><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><!--<rocketmq.version>4.7.1</rocketmq.version>--><rocketmq.version>4.5.1</rocketmq.version><flink.version>1.11.1</flink.version><flink-connector-redis.version>1.0</flink-connector-redis.version><commons-lang.version>2.5</commons-lang.version><scala.binary.version>2.12</scala.binary.version><junit.version>4.12</junit.version><redis.version>3.3.0</redis.version><slf4j.version>1.7.25</slf4j.version><fastjson.version>1.2.73</fastjson.version><joda-time.version>2.9.4</joda-time.version><!--<hadoop.version>2.8.3</hadoop.version>--><!-- 用于连接中间件团队的redis用 --><tmc-version>0.6.2</tmc-version><fileName>issue-handler</fileName><mainClass>com.xxx.issue.flink.handler.IssueHandleFlinkHandlerByCustomRedisSink</mainClass></properties><dependencies><dependency><groupId>commons-cli</groupId><artifactId>commons-cli</artifactId><version>1.4</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client --><!--<dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>${hadoop.version}</version></dependency>--><!--<dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>${hadoop.version}</version><exclusions><exclusion><artifactId>jackson-databind</artifactId><groupId>com.fasterxml.jackson.core</groupId></exclusion></exclusions></dependency>--><!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common --><!--<dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>${hadoop.version}</version><exclusions><exclusion><artifactId>jackson-databind</artifactId><groupId>com.fasterxml.jackson.core</groupId></exclusion></exclusions></dependency>--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><!--1.compile   : 默认的scope,运行期有效,需要打入包中。2.provided  : 编译器有效,运行期不需要提供,不会打入包中。3.runtime   : 编译不需要,在运行期有效,需要导入包中。(接口与实现分离)4.test      : 测试需要,不会打入包中5.system    : 非本地仓库引入、存在系统的某个路径下的jar。(一般不使用)--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version><!--<scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>ch.qos.logback</groupId><artifactId>logback-classic</artifactId><version>1.2.3</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>${slf4j.version}</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>${slf4j.version}</version><scope>test</scope></dependency><!--<dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>${redis.version}</version></dependency>--><!--<dependency><groupId>org.apache.bahir</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>${flink-connector-redis.version}</version></dependency>--><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-namesrv</artifactId><version>${rocketmq.version}</version><scope>test</scope></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-broker</artifactId><version>${rocketmq.version}</version><scope>test</scope></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>${rocketmq.version}</version></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-acl</artifactId><version>${rocketmq.version}</version></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-common</artifactId><version>${rocketmq.version}</version><exclusions><exclusion><groupId>io.netty</groupId><artifactId>netty-tcnative</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-rocksdb_2.11</artifactId><version>1.6.1</version></dependency><dependency><groupId>commons-lang</groupId><artifactId>commons-lang</artifactId><version>${commons-lang.version}</version></dependency><!--test --><dependency><groupId>junit</groupId><artifactId>junit</artifactId><scope>test</scope><version>${junit.version}</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>${fastjson.version}</version></dependency><dependency><groupId>joda-time</groupId><artifactId>joda-time</artifactId><version>${joda-time.version}</version></dependency><!-- 使用scala编程的时候使用下面的依赖 start--><!--<dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency>--><!-- 使用scala编程的时候使用下面的依赖 end--><!-- kafka connector scala 2.12 --><!--<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.0.0</version></dependency>--><!--<dependency><groupId>org.powermock</groupId><artifactId>powermock-module-junit4</artifactId><version>1.5.5</version><scope>test</scope></dependency><dependency><groupId>org.powermock</groupId><artifactId>powermock-api-mockito</artifactId><version>1.5.5</version><scope>test</scope></dependency>--></dependencies><distributionManagement><repository><id>releases</id><layout>default</layout><url>http://xxx.xxx.xxx/nexus/content/repositories/releases/</url></repository><snapshotRepository><id>snapshots</id><name>snapshots</name><url>http://xxx.xxx.xxx/nexus/content/repositories/snapshots/</url></snapshotRepository></distributionManagement><repositories><repository><id>releases</id><layout>default</layout><url>http://xxx.xxx.xxx/nexus/content/repositories/releases/</url></repository><repository><id>snapshots</id><name>snapshots</name><url>http://xxx.xxx.xxx/nexus/content/repositories/snapshots/</url><snapshots><enabled>true</enabled><updatePolicy>always</updatePolicy><checksumPolicy>warn</checksumPolicy></snapshots></repository><repository><id>xxxxx</id><name>xxxxxx</name><url>http://xxx.xxx.xxx/nexus/content/repositories/xxxx/</url></repository><repository><id>public</id><name>public</name><url>http://xxx.xxx.xxx/nexus/content/groups/public/</url></repository><!-- 新加 --><repository><id>cloudera</id><url>https://repository.cloudera.com/artifactory/cloudera-repos/</url></repository></repositories><build><finalName>${fileName}</finalName><plugins><!-- 编译插件 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.6.0</version><configuration><source>${maven.compiler.source}</source><target>${maven.compiler.target}</target><encoding>${project.build.sourceEncoding}</encoding><compilerVersion>${maven.compiler.source}</compilerVersion><showDeprecation>true</showDeprecation><showWarnings>true</showWarnings></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-surefire-plugin</artifactId><version>2.12.4</version><configuration><skipTests>${maven.test.skip}</skipTests></configuration></plugin><plugin><groupId>org.apache.rat</groupId><artifactId>apache-rat-plugin</artifactId><version>0.12</version><configuration><excludes><exclude>README.md</exclude></excludes></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-javadoc-plugin</artifactId><version>2.10.4</version><configuration><aggregate>true</aggregate><reportOutputDirectory>javadocs</reportOutputDirectory><locale>en</locale></configuration></plugin><!-- scala编译插件 --><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.1.6</version><configuration><scalaCompatVersion>2.11</scalaCompatVersion><scalaVersion>2.11.12</scalaVersion><encoding>UTF-8</encoding></configuration><executions><execution><id>compile-scala</id><phase>compile</phase><goals><goal>add-source</goal><goal>compile</goal></goals></execution><execution><id>test-compile-scala</id><phase>test-compile</phase><goals><goal>add-source</goal><goal>testCompile</goal></goals></execution></executions></plugin><!-- 打jar包插件(会包含所有依赖) --><plugin><!--<groupId>org.apache.maven.plugins</groupId>--><artifactId>maven-assembly-plugin</artifactId><!--<version>2.6</version>--><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs><!--<archive><manifest><mainClass>${mainClass}</mainClass></manifest></archive>--></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build></project>

1.31.3.log4j2.properties

rootLogger.level = ERROR
rootLogger.appenderRef.console.ref = ConsoleAppenderappender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n

1.31.4.logback.xml

<configuration><appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"><encoder><pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern></encoder></appender><appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender"><rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"><!-- Daily rollover --><fileNamePattern>log/generator.%d{yyyy-MM-dd}.log</fileNamePattern><!-- Keep 7 days' worth of history --><maxHistory>7</maxHistory></rollingPolicy><encoder><pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern></encoder></appender><root level="ERROR"><appender-ref ref="FILE" /><appender-ref ref="STDOUT" /></root>
</configuration>

1.31.5.cache.properties

#\u5E94\u7528appkey
app.key=appKeyTest
# not use memoryCache
#\u7F13\u5B58\u76D1\u63A7\u5F00\u5173 true/false
monitor.enabled=false
# 测试环境
monitor.bootstrap.servers=xxx.xxx.xxx.xxx:9094
synchronize.enabled= false
local.cache.name = none
#localCache config start
# 测试环境redis
remote.cache.servers=redis://xxx.xxx.xxx.xxx:6390,redis://xxx.xxx.xxx.xxx:6391,redis://xxx.xxx.xxx.xxx:6392,redis://xxx.xxx.xxx.xxx:6393,redis://xxx.xxx.xxx.xxx:6394,redis://xxx.xxx.xxx.xxx:6395
remote.cache.mode=cluster
#remote.cache.mode=standalone
remote.cache.password=123456synchronize.strategy=kafka
synchronize.address=
monitor.address=
monitor.strategy=kafkanamespace=doraemon

1.31.6.project-config.properties

# 本地的rocketmq的name server地址
#rocketmq.name.server.addr=localhost:9876
# 开发环境rocketmq
rocketmq.name.server.addr=xxx.xxx.xxx.xxx:9876
# 测试环境
# rocketmq.name.server.addr=xxx.xxx.xxx.xxx:9876
rocketmq.topics=issue_sync_message_1##issue_sync_message_2####################################flink相关配置 start###########################################
# 间隔5秒产生checkpoing
flink.checkpoint.interval=120000
# 确保检查点之间有至少500 ms的间隔
flink.checkpoint.minPauseBetweenCheckpoints=1000
# 检查点必须在一分钟内完成,或者被丢弃【checkpoint的超时时间】
flink.checkpoint.checkpointTimeout=60000
# 同一时间只允许进行一个检查点
flink.checkpoint.maxConcurrentCheckpoints=1
# rocketmq的读并发
flink.rockeqmq.source.parallelism=1
# redis下沉的并发
flink.redis.sink.parallelism=1
# 尝试重启次数
flink.fixedDelayRestart.times=3
# 每次尝试重启时之间的时间间隔
flink.fixedDelayRestart.interval=5
####################################redis相关配置 end  ############################################ 默认保存10天
redis.default.expiration.time=864000
####################################redis相关配置 end  ###########################################

1.31.7.IssueAcceptSimpleProducer.java

package com.xxxxx.issue.producer;import com.alibaba.fastjson.JSON;
import com.xxxxx.caterpillar.sdk.util.ObjectConvertUtil;
import com.xxxxx.doraemon.service.issue.domain.IssueSyncMessageBody;
import com.xxxxx.doraemon.service.issue.vo.IssueSyncMessageBodyVO;
import com.xxxxx.issue.utils.PropertiesUtils;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.Date;/*** 启动rocketmq的命令:* .\bin\mqnamesrv.cmd* .\bin\mqbroker.cmd -n localhost:9876 autoCreateTopicEnable=true** @author tuzuoquan* @version 1.0* @ClassName IssuePassSimpleProducer* @description TODO* @date 2020/9/14 15:29**/
public class IssueAcceptSimpleProducer {private static Logger LOG = LoggerFactory.getLogger(IssueAcceptSimpleProducer.class);public static void main(String[] args) {DefaultMQProducer producer = new DefaultMQProducer("p003");producer.setNamesrvAddr("localhost:9876");try {producer.start();} catch (MQClientException e) {e.printStackTrace();}Long i = 1000L;while (i <= 100000000L) {IssueSyncMessageBody body = new IssueSyncMessageBody();body.setIssueId(i);body.setSerialNumber("" + i);body.setCreateDate(new Date());Long flag = i % 10;String userName = "user" + flag;String mobile = "1981715866" + flag;body.setHandleUserName(userName);body.setHandleMobile(mobile);String tenantId = i % 10 + "";;body.setTenantId(tenantId);String oneLevel = i % 32 + "";String twoLevel = i % 22 + "";String threeLevel = i % 30 + "";String fourLevel = i % 15 + "";String fiveLevel = i % 25 + "";String sixLevel = i % 5 + "";String sevenLevel = i % 20 + "";Long tmp = i % 7;String handlerOrgCode = null;if (tmp.compareTo(0L) == 0) {handlerOrgCode = oneLevel;} else if (tmp.compareTo(1L) == 0) {handlerOrgCode = oneLevel + "." + twoLevel;} else if (tmp.compareTo(2L) == 0) {handlerOrgCode = oneLevel + "." + twoLevel + "." + threeLevel;} else if (tmp.compareTo(3L) == 0) {handlerOrgCode = oneLevel + "." + twoLevel + "." + threeLevel + "." + fourLevel;} else if (tmp.compareTo(4L) == 0) {handlerOrgCode = oneLevel + "." + twoLevel + "." + threeLevel + "." + fourLevel + "." + fiveLevel;} else if (tmp.compareTo(5L) == 0) {handlerOrgCode = oneLevel + "." + twoLevel + "." + threeLevel + "." + fourLevel + "." + fiveLevel + "." + sixLevel;} else if (tmp.compareTo(6L) == 0) {handlerOrgCode = oneLevel + "." + twoLevel + "." + threeLevel + "." + fourLevel + "." + fiveLevel + "." + sixLevel + "." + sevenLevel;}body.setHandleOrgCode(handlerOrgCode);//1、受理数String tag = "issue_accept_operat";Integer operatType = 61;//2、发生事件数:// TAG:issue_accept_operat    operatType:61 +// TAG:issue_add_operat   operatType:2 -// TAG:issue_delete_operat    operatType:0   org_code:区域范围  createDate:今日//3、办结数://TAG:issue_inspect_pass_operat    operatType:30 +TAG:issue_complete_operat    operatType:31IssueSyncMessageBodyVO issueSyncMessageBodyVO = new IssueSyncMessageBodyVO();issueSyncMessageBodyVO.setBody(body);issueSyncMessageBodyVO.setOperatType(operatType);//TQMessage msg = new TQMessage("issue_sync_message", "issue_add_operat", ObjectConvertUtil.objectToByte(issueSyncMessageBodyVO));Message msg = new Message("issue_sync_message_1", tag, "id_" + i, ObjectConvertUtil.objectToByte(issueSyncMessageBodyVO));//Message msg = new Message("issue_sync_message5", "issue_add_operat", "id_" + i, JSON.toJSONString(issueSyncMessageBodyVO).getBytes());try {producer.send(msg);} catch (Exception e) {e.printStackTrace();}LOG.info("send :" + i + " content: " + JSON.toJSONString(issueSyncMessageBodyVO));try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}i++;}//Shut down once the producer instance is not longer in use.producer.shutdown();}
}

1.31.8.Consumer.java

package com.xxxxx.issue.consumer;import com.xxxxx.caterpillar.sdk.util.ObjectConvertUtil;
import com.xxxxx.doraemon.service.issue.vo.IssueSyncMessageBodyVO;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;import java.util.List;public class Consumer {public static void main(String[] args) throws InterruptedException, MQClientException {// Instantiate with specified consumer group name.DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("abcdefsssss");// Specify name server addresses.//consumer.setNamesrvAddr("xxx.xxx.xxx.xxx:9876");consumer.setNamesrvAddr("localhost:9876");//consumer.setNamesrvAddr("xxx.xxx.xxx.xxx:9876");// Subscribe one more more topics to consume.//consumer.subscribe(PropertiesUtils.getInstance().getRocketMqTopic_1(), "issue_accept_operat || issue_add_operat || issue_delete_operat");//consumer.subscribe(PropertiesUtils.getInstance().getRocketMqTopic_1(), "*");consumer.subscribe("issue_sync_message_1", "*");consumer.subscribe("issue_sync_message_2", "*");// Register callback to execute on arrival of messages fetched from brokers.consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {IssueSyncMessageBodyVO issueSyncMessageBodyVO = (IssueSyncMessageBodyVO) ObjectConvertUtil.byteToObject(msg.getBody());System.out.println(issueSyncMessageBodyVO.getBody());//byte[] value = JSON.toJSONString(issueSyncMessageBodyVO).getBytes();
//                    System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), value.toString());}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//Launch the consumer instance.consumer.start();System.out.printf("Consumer Started.%n");}
}

1.31.9.DefaultTopicSelector.java

package org.apache.rocketmq.flink.common.selector;public class DefaultTopicSelector<T> implements TopicSelector<T> {private final String topicName;private final String tagName;public DefaultTopicSelector(final String topicName, final String tagName) {this.topicName = topicName;this.tagName = tagName;}public DefaultTopicSelector(final String topicName) {this(topicName, "");}@Overridepublic String getTopic(T tuple) {return topicName;}@Overridepublic String getTag(T tuple) {return tagName;}}

1.31.10.SimpleTopicSelector.java

package org.apache.rocketmq.flink.common.selector;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.Map;public class SimpleTopicSelector implements TopicSelector<Map> {private static final Logger LOG = LoggerFactory.getLogger(SimpleTopicSelector.class);private final String topicFieldName;private final String defaultTopicName;private final String tagFieldName;private final String defaultTagName;/*** SimpleTopicSelector Constructor.* @param topicFieldName field name used for selecting topic* @param defaultTopicName default field name used for selecting topic* @param tagFieldName field name used for selecting tag* @param defaultTagName default field name used for selecting tag*/public SimpleTopicSelector(String topicFieldName, String defaultTopicName, String tagFieldName, String defaultTagName) {this.topicFieldName = topicFieldName;this.defaultTopicName = defaultTopicName;this.tagFieldName = tagFieldName;this.defaultTagName = defaultTagName;}@Overridepublic String getTopic(Map tuple) {if (tuple.containsKey(topicFieldName)) {Object topic =  tuple.get(topicFieldName);return topic != null ? topic.toString() : defaultTopicName;} else {LOG.warn("Field {} Not Found. Returning default topic {}", topicFieldName, defaultTopicName);return defaultTopicName;}}@Overridepublic String getTag(Map tuple) {if (tuple.containsKey(tagFieldName)) {Object tag = tuple.get(tagFieldName);return tag != null ? tag.toString() : defaultTagName;} else {LOG.warn("Field {} Not Found. Returning default tag {}", tagFieldName, defaultTagName);return defaultTagName;}}
}

1.31.11.TopicSelector.java

package org.apache.rocketmq.flink.common.selector;import java.io.Serializable;public interface TopicSelector<T> extends Serializable {String getTopic(T tuple);String getTag(T tuple);
}

1.31.12.KeyValueDeserializationSchema.java

package org.apache.rocketmq.flink.common.serialization;import org.apache.flink.api.java.typeutils.ResultTypeQueryable;import java.io.Serializable;public interface KeyValueDeserializationSchema<T> extends ResultTypeQueryable<T>, Serializable {T deserializeKeyAndValue(byte[] key, byte[] value);
}

1.31.13.KeyValueSerializationSchema.java

package org.apache.rocketmq.flink.common.serialization;import java.io.Serializable;public interface KeyValueSerializationSchema<T> extends Serializable {byte[] serializeKey(T tuple);byte[] serializeValue(T tuple);
}

1.31.14.SimpleKeyValueDeserializationSchema.java

package org.apache.rocketmq.flink.common.serialization;import org.apache.flink.api.common.typeinfo.TypeInformation;import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;public class SimpleKeyValueDeserializationSchema implements KeyValueDeserializationSchema<Map> {public static final String DEFAULT_KEY_FIELD = "key";public static final String DEFAULT_VALUE_FIELD = "value";public String keyField;public String valueField;public SimpleKeyValueDeserializationSchema() {this(DEFAULT_KEY_FIELD, DEFAULT_VALUE_FIELD);}/*** SimpleKeyValueDeserializationSchema Constructor.* @param keyField tuple field for selecting the key* @param valueField  tuple field for selecting the value*/public SimpleKeyValueDeserializationSchema(String keyField, String valueField) {this.keyField = keyField;this.valueField = valueField;}@Overridepublic Map deserializeKeyAndValue(byte[] key, byte[] value) {HashMap map = new HashMap(2);if (keyField != null) {String k = key != null ? new String(key, StandardCharsets.UTF_8) : null;map.put(keyField, k);}if (valueField != null) {String v = value != null ? new String(value, StandardCharsets.UTF_8) : null;map.put(valueField, v);}return map;}@Overridepublic TypeInformation<Map> getProducedType() {return TypeInformation.of(Map.class);}
}

1.31.15.SimpleKeyValueSerializationSchema.java

package org.apache.rocketmq.flink.common.serialization;import java.nio.charset.StandardCharsets;
import java.util.Map;public class SimpleKeyValueSerializationSchema implements KeyValueSerializationSchema<Map> {public static final String DEFAULT_KEY_FIELD = "key";public static final String DEFAULT_VALUE_FIELD = "value";public String keyField;public String valueField;public SimpleKeyValueSerializationSchema() {this(DEFAULT_KEY_FIELD, DEFAULT_VALUE_FIELD);}/*** SimpleKeyValueSerializationSchema Constructor.* @param keyField tuple field for selecting the key* @param valueField  tuple field for selecting the value*/public SimpleKeyValueSerializationSchema(String keyField, String valueField) {this.keyField = keyField;this.valueField = valueField;}@Overridepublic byte[] serializeKey(Map tuple) {if (tuple == null || keyField == null) {return null;}Object key = tuple.get(keyField);return key != null ? key.toString().getBytes(StandardCharsets.UTF_8) : null;}@Overridepublic byte[] serializeValue(Map tuple) {if (tuple == null || valueField == null) {return null;}Object value = tuple.get(valueField);return value != null ? value.toString().getBytes(StandardCharsets.UTF_8) : null;}}

1.31.16.RocketMQConfig.java

package org.apache.rocketmq.flink;import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.Validate;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;import java.util.Properties;
import java.util.UUID;import static org.apache.rocketmq.flink.RocketMQUtils.getInteger;/*** RocketMQConfig for Consumer/Producer.*/
public class RocketMQConfig {// Server Configpublic static final String NAME_SERVER_ADDR = "nameserver.address"; // Requiredpublic static final String NAME_SERVER_POLL_INTERVAL = "nameserver.poll.interval";public static final int DEFAULT_NAME_SERVER_POLL_INTERVAL = 30000; // 30 secondspublic static final String BROKER_HEART_BEAT_INTERVAL = "brokerserver.heartbeat.interval";public static final int DEFAULT_BROKER_HEART_BEAT_INTERVAL = 30000; // 30 seconds// Producer related configpublic static final String PRODUCER_GROUP = "producer.group";public static final String PRODUCER_RETRY_TIMES = "producer.retry.times";public static final int DEFAULT_PRODUCER_RETRY_TIMES = 3;public static final String PRODUCER_TIMEOUT = "producer.timeout";public static final int DEFAULT_PRODUCER_TIMEOUT = 3000; // 3 secondspublic static final String ACCESS_KEY = "access.key";public static final String SECRET_KEY = "secret.key";// Consumer related configpublic static final String CONSUMER_GROUP = "consumer.group"; // Requiredpublic static final String CONSUMER_TOPIC = "consumer.topic"; // Requiredpublic static final String CONSUMER_TAG = "consumer.tag";public static final String DEFAULT_CONSUMER_TAG = "*";public static final String CONSUMER_OFFSET_RESET_TO = "consumer.offset.reset.to";public static final String CONSUMER_OFFSET_LATEST = "latest";public static final String CONSUMER_OFFSET_EARLIEST = "earliest";public static final String CONSUMER_OFFSET_TIMESTAMP = "timestamp";public static final String CONSUMER_OFFSET_FROM_TIMESTAMP = "consumer.offset.from.timestamp";public static final String CONSUMER_OFFSET_PERSIST_INTERVAL = "consumer.offset.persist.interval";public static final int DEFAULT_CONSUMER_OFFSET_PERSIST_INTERVAL = 5000; // 5 secondspublic static final String CONSUMER_PULL_POOL_SIZE = "consumer.pull.thread.pool.size";public static final int DEFAULT_CONSUMER_PULL_POOL_SIZE = 20;public static final String CONSUMER_BATCH_SIZE = "consumer.batch.size";public static final int DEFAULT_CONSUMER_BATCH_SIZE = 32;public static final String CONSUMER_DELAY_WHEN_MESSAGE_NOT_FOUND = "consumer.delay.when.message.not.found";public static final int DEFAULT_CONSUMER_DELAY_WHEN_MESSAGE_NOT_FOUND = 10;public static final String MSG_DELAY_LEVEL = "msg.delay.level";public static final int MSG_DELAY_LEVEL00 = 0; // no delaypublic static final int MSG_DELAY_LEVEL01 = 1; // 1spublic static final int MSG_DELAY_LEVEL02 = 2; // 5spublic static final int MSG_DELAY_LEVEL03 = 3; // 10spublic static final int MSG_DELAY_LEVEL04 = 4; // 30spublic static final int MSG_DELAY_LEVEL05 = 5; // 1minpublic static final int MSG_DELAY_LEVEL06 = 6; // 2minpublic static final int MSG_DELAY_LEVEL07 = 7; // 3minpublic static final int MSG_DELAY_LEVEL08 = 8; // 4minpublic static final int MSG_DELAY_LEVEL09 = 9; // 5minpublic static final int MSG_DELAY_LEVEL10 = 10; // 6minpublic static final int MSG_DELAY_LEVEL11 = 11; // 7minpublic static final int MSG_DELAY_LEVEL12 = 12; // 8minpublic static final int MSG_DELAY_LEVEL13 = 13; // 9minpublic static final int MSG_DELAY_LEVEL14 = 14; // 10minpublic static final int MSG_DELAY_LEVEL15 = 15; // 20minpublic static final int MSG_DELAY_LEVEL16 = 16; // 30minpublic static final int MSG_DELAY_LEVEL17 = 17; // 1hpublic static final int MSG_DELAY_LEVEL18 = 18; // 2h/*** Build Producer Configs.* @param props Properties* @param producer DefaultMQProducer*/public static void buildProducerConfigs(Properties props, DefaultMQProducer producer) {buildCommonConfigs(props, producer);String group = props.getProperty(PRODUCER_GROUP);if (StringUtils.isEmpty(group)) {group = UUID.randomUUID().toString();}producer.setProducerGroup(props.getProperty(PRODUCER_GROUP, group));producer.setRetryTimesWhenSendFailed(getInteger(props,PRODUCER_RETRY_TIMES, DEFAULT_PRODUCER_RETRY_TIMES));producer.setRetryTimesWhenSendAsyncFailed(getInteger(props,PRODUCER_RETRY_TIMES, DEFAULT_PRODUCER_RETRY_TIMES));producer.setSendMsgTimeout(getInteger(props,PRODUCER_TIMEOUT, DEFAULT_PRODUCER_TIMEOUT));}/*** Build Consumer Configs.* @param props Properties* @param consumer DefaultMQPushConsumer*/public static void buildConsumerConfigs(Properties props, DefaultMQPullConsumer consumer) {buildCommonConfigs(props, consumer);consumer.setMessageModel(MessageModel.CLUSTERING);consumer.setPersistConsumerOffsetInterval(getInteger(props,CONSUMER_OFFSET_PERSIST_INTERVAL, DEFAULT_CONSUMER_OFFSET_PERSIST_INTERVAL));}/*** Build Common Configs.* @param props Properties* @param client ClientConfig*/public static void buildCommonConfigs(Properties props, ClientConfig client) {String nameServers = props.getProperty(NAME_SERVER_ADDR);Validate.notEmpty(nameServers);client.setNamesrvAddr(nameServers);client.setPollNameServerInterval(getInteger(props,NAME_SERVER_POLL_INTERVAL, DEFAULT_NAME_SERVER_POLL_INTERVAL));client.setHeartbeatBrokerInterval(getInteger(props,BROKER_HEART_BEAT_INTERVAL, DEFAULT_BROKER_HEART_BEAT_INTERVAL));}/*** Build credentials for client.* @param props* @return*/public static AclClientRPCHook buildAclRPCHook(Properties props) {String accessKey = props.getProperty(ACCESS_KEY);String secretKey = props.getProperty(SECRET_KEY);if (!StringUtils.isEmpty(accessKey) && !StringUtils.isEmpty(secretKey)) {AclClientRPCHook aclClientRPCHook = new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));return aclClientRPCHook;}return null;}
}

1.31.17.RocketMQSink.java

package org.apache.rocketmq.flink;import org.apache.commons.lang.Validate;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.flink.common.selector.TopicSelector;
import org.apache.rocketmq.flink.common.serialization.KeyValueSerializationSchema;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.nio.charset.StandardCharsets;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
import java.util.UUID;/*** The RocketMQSink provides at-least-once reliability guarantees when* checkpoints are enabled and batchFlushOnCheckpoint(true) is set.* Otherwise, the sink reliability guarantees depends on rocketmq producer's retry policy.*/
public class RocketMQSink<IN> extends RichSinkFunction<IN> implements CheckpointedFunction {private static final long serialVersionUID = 1L;private static final Logger LOG = LoggerFactory.getLogger(RocketMQSink.class);private transient DefaultMQProducer producer;private boolean async; // false by defaultprivate Properties props;private TopicSelector<IN> topicSelector;private KeyValueSerializationSchema<IN> serializationSchema;private boolean batchFlushOnCheckpoint; // false by defaultprivate int batchSize = 1000;private List<Message> batchList;private int messageDeliveryDelayLevel = RocketMQConfig.MSG_DELAY_LEVEL00;public RocketMQSink(KeyValueSerializationSchema<IN> schema, TopicSelector<IN> topicSelector, Properties props) {this.serializationSchema = schema;this.topicSelector = topicSelector;this.props = props;if (this.props != null) {this.messageDeliveryDelayLevel  = RocketMQUtils.getInteger(this.props, RocketMQConfig.MSG_DELAY_LEVEL,RocketMQConfig.MSG_DELAY_LEVEL00);if (this.messageDeliveryDelayLevel  < RocketMQConfig.MSG_DELAY_LEVEL00) {this.messageDeliveryDelayLevel  = RocketMQConfig.MSG_DELAY_LEVEL00;} else if (this.messageDeliveryDelayLevel  > RocketMQConfig.MSG_DELAY_LEVEL18) {this.messageDeliveryDelayLevel  = RocketMQConfig.MSG_DELAY_LEVEL18;}}}@Overridepublic void open(Configuration parameters) throws Exception {Validate.notEmpty(props, "Producer properties can not be empty");Validate.notNull(topicSelector, "TopicSelector can not be null");Validate.notNull(serializationSchema, "KeyValueSerializationSchema can not be null");producer = new DefaultMQProducer(RocketMQConfig.buildAclRPCHook(props));producer.setInstanceName(String.valueOf(getRuntimeContext().getIndexOfThisSubtask()) + "_" + UUID.randomUUID());RocketMQConfig.buildProducerConfigs(props, producer);batchList = new LinkedList<>();if (batchFlushOnCheckpoint && !((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled()) {LOG.warn("Flushing on checkpoint is enabled, but checkpointing is not enabled. Disabling flushing.");batchFlushOnCheckpoint = false;}try {producer.start();} catch (MQClientException e) {throw new RuntimeException(e);}}@Overridepublic void invoke(IN input, Context context) throws Exception {Message msg = prepareMessage(input);if (batchFlushOnCheckpoint) {batchList.add(msg);if (batchList.size() >= batchSize) {flushSync();}return;}if (async) {try {producer.send(msg, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {LOG.debug("Async send message success! result: {}", sendResult);}@Overridepublic void onException(Throwable throwable) {if (throwable != null) {LOG.error("Async send message failure!", throwable);}}});} catch (Exception e) {LOG.error("Async send message failure!", e);}} else {try {SendResult result = producer.send(msg);LOG.debug("Sync send message result: {}", result);if (result.getSendStatus() != SendStatus.SEND_OK) {throw new RemotingException(result.toString());}} catch (Exception e) {LOG.error("Sync send message failure!", e);throw e;}}}private Message prepareMessage(IN input) {String topic = topicSelector.getTopic(input);String tag = (tag = topicSelector.getTag(input)) != null ? tag : "";byte[] k = serializationSchema.serializeKey(input);String key = k != null ? new String(k, StandardCharsets.UTF_8) : "";byte[] value = serializationSchema.serializeValue(input);Validate.notNull(topic, "the message topic is null");Validate.notNull(value, "the message body is null");Message msg = new Message(topic, tag, key, value);if (this.messageDeliveryDelayLevel > RocketMQConfig.MSG_DELAY_LEVEL00) {msg.setDelayTimeLevel(this.messageDeliveryDelayLevel);}return msg;}public RocketMQSink<IN> withAsync(boolean async) {this.async = async;return this;}public RocketMQSink<IN> withBatchFlushOnCheckpoint(boolean batchFlushOnCheckpoint) {this.batchFlushOnCheckpoint = batchFlushOnCheckpoint;return this;}public RocketMQSink<IN> withBatchSize(int batchSize) {this.batchSize = batchSize;return this;}@Overridepublic void close() throws Exception {if (producer != null) {try {flushSync();} catch (Exception e) {LOG.error("FlushSync failure!", e);}// make sure producer can be shutdown, thus current producerGroup will be unregisteredproducer.shutdown();}}private void flushSync() throws Exception {if (batchFlushOnCheckpoint) {synchronized (batchList) {if (batchList.size() > 0) {producer.send(batchList);batchList.clear();}}}}@Overridepublic void snapshotState(FunctionSnapshotContext context) throws Exception {flushSync();}@Overridepublic void initializeState(FunctionInitializationContext context) throws Exception {// Nothing to do}
}

1.31.18.RocketMQSource.java

import com.alibaba.fastjson.JSON;
import com.xxxxx.caterpillar.sdk.util.ObjectConvertUtil;
import com.xxxxx.doraemon.service.issue.vo.IssueSyncMessageBodyVO;
import org.apache.commons.collections.map.LinkedMap;
import org.apache.commons.lang.Validate;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.rocketmq.client.consumer.*;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.flink.common.serialization.KeyValueDeserializationSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;import static org.apache.rocketmq.flink.RocketMQConfig.*;
import static org.apache.rocketmq.flink.RocketMQUtils.getInteger;
import static org.apache.rocketmq.flink.RocketMQUtils.getLong;/*** The MyRocketMQSource is based on RocketMQ pull consumer mode, and provides exactly once reliability guarantees when* checkpoints are enabled. Otherwise, the source doesn't provide any reliability guarantees.*/
public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT>implements CheckpointedFunction, CheckpointListener, ResultTypeQueryable<OUT> {private static final long serialVersionUID = 1L;private static final Logger LOG = LoggerFactory.getLogger(RocketMQSource.class);private transient MQPullConsumerScheduleService pullConsumerScheduleService;private DefaultMQPullConsumer consumer;private KeyValueDeserializationSchema<OUT> schema;private RunningChecker runningChecker;private transient ListState<Tuple2<MessageQueue, Long>> unionOffsetStates;private Map<MessageQueue, Long> offsetTable;private Map<MessageQueue, Long> restoredOffsets;/** Data for pending but uncommitted offsets. */private LinkedMap pendingOffsetsToCommit;private Properties props;private String topic;private String group;private static final String OFFSETS_STATE_NAME = "topic-partition-offset-states";private transient volatile boolean restored;private transient boolean enableCheckpoint;public RocketMQSource(KeyValueDeserializationSchema<OUT> schema, Properties props) {this.schema = schema;this.props = props;}@Overridepublic void open(Configuration parameters) throws Exception {LOG.debug("source open....");Validate.notEmpty(props, "Consumer properties can not be empty");Validate.notNull(schema, "KeyValueDeserializationSchema can not be null");this.topic = props.getProperty(RocketMQConfig.CONSUMER_TOPIC);this.group = props.getProperty(RocketMQConfig.CONSUMER_GROUP);Validate.notEmpty(topic, "Consumer topic can not be empty");Validate.notEmpty(group, "Consumer group can not be empty");this.enableCheckpoint = ((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled();if (offsetTable == null) {offsetTable = new ConcurrentHashMap<>();}if (restoredOffsets == null) {restoredOffsets = new ConcurrentHashMap<>();}if (pendingOffsetsToCommit == null) {pendingOffsetsToCommit = new LinkedMap();}runningChecker = new RunningChecker();//Wait for lite pull consumer//pullConsumerScheduleService = new MQPullConsumerScheduleService(group, RocketMQConfig.buildAclRPCHook(props));pullConsumerScheduleService = new MQPullConsumerScheduleService(group);consumer = pullConsumerScheduleService.getDefaultMQPullConsumer();consumer.setInstanceName(String.valueOf(getRuntimeContext().getIndexOfThisSubtask()) + "_" + UUID.randomUUID());RocketMQConfig.buildConsumerConfigs(props, consumer);}@Overridepublic void run(SourceContext context) throws Exception {LOG.debug("source run....");// The lock that guarantees that record emission and state updates are atomic,// from the view of taking a checkpoint.final Object lock = context.getCheckpointLock();int delayWhenMessageNotFound = getInteger(props, RocketMQConfig.CONSUMER_DELAY_WHEN_MESSAGE_NOT_FOUND,RocketMQConfig.DEFAULT_CONSUMER_DELAY_WHEN_MESSAGE_NOT_FOUND);String tag = props.getProperty(RocketMQConfig.CONSUMER_TAG, RocketMQConfig.DEFAULT_CONSUMER_TAG);int pullPoolSize = getInteger(props, RocketMQConfig.CONSUMER_PULL_POOL_SIZE,RocketMQConfig.DEFAULT_CONSUMER_PULL_POOL_SIZE);int pullBatchSize = getInteger(props, RocketMQConfig.CONSUMER_BATCH_SIZE,RocketMQConfig.DEFAULT_CONSUMER_BATCH_SIZE);pullConsumerScheduleService.setPullThreadNums(pullPoolSize);pullConsumerScheduleService.registerPullTaskCallback(topic, new PullTaskCallback() {@Overridepublic void doPullTask(MessageQueue mq, PullTaskContext pullTaskContext) {try {long offset = getMessageQueueOffset(mq);if (offset < 0) {return;}PullResult pullResult = consumer.pull(mq, tag, offset, pullBatchSize);boolean found = false;switch (pullResult.getPullStatus()) {case FOUND:List<MessageExt> messages = pullResult.getMsgFoundList();for (MessageExt msg : messages) {byte[] key = msg.getKeys() != null ? msg.getKeys().getBytes(StandardCharsets.UTF_8) : null;//byte[] value = msg.getBody();//IssueSyncMessageBodyVO bodyVO = JSON.parseObject(new String(value),IssueSyncMessageBodyVO.class);IssueSyncMessageBodyVO issueSyncMessageBodyVO = (IssueSyncMessageBodyVO) ObjectConvertUtil.byteToObject(msg.getBody());//LOG.info(JSON.toJSONString("issueSyncMessageBodyVO = " + issueSyncMessageBodyVO));byte[] value = JSON.toJSONString(issueSyncMessageBodyVO).getBytes();OUT data = schema.deserializeKeyAndValue(key, value);// output and state update are atomicsynchronized (lock) {context.collectWithTimestamp(data, msg.getBornTimestamp());}}found = true;break;case NO_MATCHED_MSG:LOG.debug("No matched message after offset {} for queue {}", offset, mq);break;case NO_NEW_MSG:break;case OFFSET_ILLEGAL:LOG.warn("Offset {} is illegal for queue {}", offset, mq);break;default:break;}synchronized (lock) {putMessageQueueOffset(mq, pullResult.getNextBeginOffset());}if (found) {pullTaskContext.setPullNextDelayTimeMillis(0); // no delay when messages were found} else {pullTaskContext.setPullNextDelayTimeMillis(delayWhenMessageNotFound);}} catch (Exception e) {throw new RuntimeException(e);}}});try {pullConsumerScheduleService.start();} catch (MQClientException e) {throw new RuntimeException(e);}runningChecker.setRunning(true);awaitTermination();}private void awaitTermination() throws InterruptedException {while (runningChecker.isRunning()) {Thread.sleep(50);}}private long getMessageQueueOffset(MessageQueue mq) throws MQClientException {Long offset = offsetTable.get(mq);// restoredOffsets(unionOffsetStates) is the restored global union state;// should only snapshot mqs that actually belong to usif (restored && offset == null) {offset = restoredOffsets.get(mq);}if (offset == null) {offset = consumer.fetchConsumeOffset(mq, false);if (offset < 0) {String initialOffset = props.getProperty(RocketMQConfig.CONSUMER_OFFSET_RESET_TO, CONSUMER_OFFSET_LATEST);switch (initialOffset) {case CONSUMER_OFFSET_EARLIEST:offset = consumer.minOffset(mq);break;case CONSUMER_OFFSET_LATEST:offset = consumer.maxOffset(mq);break;case CONSUMER_OFFSET_TIMESTAMP:offset = consumer.searchOffset(mq, getLong(props,RocketMQConfig.CONSUMER_OFFSET_FROM_TIMESTAMP, System.currentTimeMillis()));break;default:throw new IllegalArgumentException("Unknown value for CONSUMER_OFFSET_RESET_TO.");}}}offsetTable.put(mq, offset);return offsetTable.get(mq);}private void putMessageQueueOffset(MessageQueue mq, long offset) throws MQClientException {offsetTable.put(mq, offset);if (!enableCheckpoint) {consumer.updateConsumeOffset(mq, offset);}}@Overridepublic void cancel() {LOG.debug("cancel ...");runningChecker.setRunning(false);if (pullConsumerScheduleService != null) {pullConsumerScheduleService.shutdown();}if (offsetTable != null) {offsetTable.clear();}if (restoredOffsets != null) {restoredOffsets.clear();}if (pendingOffsetsToCommit != null) {pendingOffsetsToCommit.clear();}}@Overridepublic void close() throws Exception {LOG.debug("close ...");// pretty much the same logic as cancellingtry {cancel();} finally {super.close();}}@Overridepublic void snapshotState(FunctionSnapshotContext context) throws Exception {// called when a snapshot for a checkpoint is requestedif (!runningChecker.isRunning()) {LOG.debug("snapshotState() called on closed source; returning null.");return;}if (LOG.isDebugEnabled()) {LOG.debug("Snapshotting state {} ...", context.getCheckpointId());}unionOffsetStates.clear();HashMap<MessageQueue, Long> currentOffsets = new HashMap<>(offsetTable.size());// remove the unassigned queues in order to avoid read the wrong offset when the source restartSet<MessageQueue> assignedQueues = consumer.fetchMessageQueuesInBalance(topic);offsetTable.entrySet().removeIf(item -> !assignedQueues.contains(item.getKey()));for (Map.Entry<MessageQueue, Long> entry : offsetTable.entrySet()) {unionOffsetStates.add(Tuple2.of(entry.getKey(), entry.getValue()));currentOffsets.put(entry.getKey(), entry.getValue());}pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);if (LOG.isDebugEnabled()) {LOG.debug("Snapshotted state, last processed offsets: {}, checkpoint id: {}, timestamp: {}",offsetTable, context.getCheckpointId(), context.getCheckpointTimestamp());}}@Overridepublic void initializeState(FunctionInitializationContext context) throws Exception {// called every time the user-defined function is initialized,// be that when the function is first initialized or be that// when the function is actually recovering from an earlier checkpoint.// Given this, initializeState() is not only the place where different types of state are initialized,// but also where state recovery logic is included.LOG.debug("initialize State ...");this.unionOffsetStates = context.getOperatorStateStore().getUnionListState(new ListStateDescriptor<>(OFFSETS_STATE_NAME, TypeInformation.of(new TypeHint<Tuple2<MessageQueue, Long>>() {})));this.restored = context.isRestored();if (restored) {if (restoredOffsets == null) {restoredOffsets = new ConcurrentHashMap<>();}for (Tuple2<MessageQueue, Long> mqOffsets : unionOffsetStates.get()) {if (!restoredOffsets.containsKey(mqOffsets.f0) || restoredOffsets.get(mqOffsets.f0) < mqOffsets.f1) {restoredOffsets.put(mqOffsets.f0, mqOffsets.f1);}}LOG.info("Setting restore state in the consumer. Using the following offsets: {}", restoredOffsets);} else {LOG.info("No restore state for the consumer.");}}@Overridepublic TypeInformation<OUT> getProducedType() {return schema.getProducedType();}@Overridepublic void notifyCheckpointComplete(long checkpointId) throws Exception {// callback when checkpoint completeif (!runningChecker.isRunning()) {LOG.debug("notifyCheckpointComplete() called on closed source; returning null.");return;}final int posInMap = pendingOffsetsToCommit.indexOf(checkpointId);if (posInMap == -1) {LOG.warn("Received confirmation for unknown checkpoint id {}", checkpointId);return;}Map<MessageQueue, Long> offsets = (Map<MessageQueue, Long>) pendingOffsetsToCommit.remove(posInMap);// remove older checkpoints in mapfor (int i = 0; i < posInMap; i++) {pendingOffsetsToCommit.remove(0);}if (offsets == null || offsets.size() == 0) {LOG.debug("Checkpoint state was empty.");return;}for (Map.Entry<MessageQueue, Long> entry : offsets.entrySet()) {consumer.updateConsumeOffset(entry.getKey(), entry.getValue());}}
}

1.31.19.RocketMQUtils.java

package org.apache.rocketmq.flink;import java.util.Properties;public final class RocketMQUtils {public static int getInteger(Properties props, String key, int defaultValue) {return Integer.parseInt(props.getProperty(key, String.valueOf(defaultValue)));}public static long getLong(Properties props, String key, long defaultValue) {return Long.parseLong(props.getProperty(key, String.valueOf(defaultValue)));}public static boolean getBoolean(Properties props, String key, boolean defaultValue) {return Boolean.parseBoolean(props.getProperty(key, String.valueOf(defaultValue)));}
}

1.31.20.RunningChecker.java

package org.apache.rocketmq.flink;import java.io.Serializable;public class RunningChecker implements Serializable {private volatile boolean isRunning = false;public boolean isRunning() {return isRunning;}public void setRunning(boolean running) {isRunning = running;}
}

1.31.21.DateUtils.java

package com.xxxxx.issue.utils;import org.apache.commons.lang.StringUtils;
import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;/*** Created by Administrator on 2017/3/7.** @author Administrator*/
public final class DateUtils {private static final Logger logger = LoggerFactory.getLogger(DateUtils.class);/*** 时间格式符:yyyy-MM-dd HH:mm:ss*/public static final String PATTERN_DATE_TIME = "yyyy-MM-dd HH:mm:ss";/*** 精简时间格式符:yyyyMMddHHmmss*/public static final String PATTERN_DATE_TIME_SIMPLE = "yyyyMMddHHmmss";/*** 毫秒格式符:yyyyMMddhhmmssSSS*/public static final String PATTERN_DATE_TIME_ALL = "yyyyMMddhhmmssSSS";/*** 时间格式符:yyyy-MM-dd*/public static final String PATTERN_DATE = "yyyy-MM-dd";/*** 精简时间格式符:yyyyMM dd*/public static final String PATTERN_DATE_SIMPLE = "yyyyMMdd";/*** 精简时间格式符:yyMM*/public static final String PATTERN_DATE_SIMPLE_YYMM = "yyMM";/*** 时间格式符:HH:mm:ss*/public static final String PATTERN_TIME = "HH:mm:ss";/*** 精简时间格式符:HHmmss*/public static final String PATTERN_TIME_SIMPLE = "HHmmss";/*** 时间格式符:yyyy*/public static final String PATTERN_YYYY = "yyyy";/*** 时间格式符:MM*/public static final String PATTERN_MM = "MM";/*** 时间格式符:dd*/public static final String PATTERN_DAY = "dd";/*** 从开始到现在的最大时间戳**/public static final Long TIME_MAX = 999999999999L;/*** 时间格式符:E*/public static final String PATTERN_WEEK = "E";private static final String DATE_STR = "yyyy-MM-dd";private static final String TIME_STR = " 00:00:00";private static final String DATE_TIME_STR = "yyyy-MM-dd HH:mm:ss";/*** @return 取得系统毫秒数,返回Long.*/public static Long getTimeMillisLong() {return System.currentTimeMillis();}/*** @return 取得系统秒数,返回Long.*/public static Long getTimeSecondLong() {return System.currentTimeMillis() / 1000;}/*** @return 取得系统毫秒数,返回字符串.*/public static String getTimeMillisString() {return System.currentTimeMillis() + "";}/*** @return 取得现在年月日时分秒yyyy-MM-dd HH:mm:ss.*/public static String getNowDateTime() {return formateDateTime(PATTERN_DATE_TIME);}/*** @return 取得现在年月日时分秒yyyyMMddHHmmss.*/public static String getNowDateTimeSimple() {return formateDateTime(PATTERN_DATE_TIME_SIMPLE);}/*** @return 取得现在年月日时分秒yyyyMMddHHmmss.*/public static String getNowDateTimeAll() {return formateDateTime(PATTERN_DATE_TIME_ALL);}/*** @return 取得现在年月日yyyy-MM-dd.*/public static String getNowDate() {return formateDateTime(PATTERN_DATE);}/*** @return 取得现在年月日yyyyMMdd.*/public static String getNowDateSimple() {return formateDateTime(PATTERN_DATE_SIMPLE);}/*** @return 取得现在年月yyMM.*/public static String getNowDateSimpleYymm() {return formateDateTime(PATTERN_DATE_SIMPLE_YYMM);}/*** @return  取得现在时分秒HH:mm:ss.*/public static String getNowTime() {return formateDateTime(PATTERN_TIME);}/*** @return 取得现在时分秒HHmmss.*/public static String getNowTimeSimple() {return formateDateTime(PATTERN_TIME_SIMPLE);}/*** @return 取得现在年yyyy.*/public static String getNowYear() {return formateDateTime(PATTERN_YYYY);}/*** @return 取得现在月MM.*/public static String getNowMonth() {return formateDateTime(PATTERN_MM);}/*** @return 取得现在天dd.*/public static String getNowDay() {return formateDateTime(PATTERN_DAY);}/*** @return 取得现在星期,格式为星期一.*/public static String getNowWeek() {return formateDateTime(PATTERN_WEEK);}/*** @param pattern 自定义的格式类型* @return 根据自定义格式取得现在时间.*/public static String getNowDateTime(String pattern) {if (StringUtils.isNotBlank(pattern)) {return formateDateTime(pattern);} else {return "";}}/*** @return 取得当月第一天.*/public static String getStartDayOfMonth() {DateTime dateTime = new DateTime();return dateTime.dayOfMonth().withMinimumValue().withTimeAtStartOfDay().toString(PATTERN_DATE);}/*** @return 取得当月最后一天.*/public static String getEndDayOfMonth() {DateTime dateTime = new DateTime();return dateTime.dayOfMonth().withMaximumValue().millisOfDay().withMaximumValue().toString(PATTERN_DATE);}/*** @param month 月数* @return 增加月数* @throws Exception 向外抛出异常*/public static String addMonth(int month) throws Exception {//设置日期格式SimpleDateFormat df = new SimpleDateFormat(PATTERN_DATE_TIME);String validatetime = df.format(new Date());Date now = df.parse(validatetime);Calendar calendar = Calendar.getInstance();calendar.setTime(now);calendar.add(Calendar.MONTH, month);return calendar.getTime().getTime() + "";}/*** 增加或减少指定数量的天数** @param date :传入的时间* @param num  :增加或减少的天数,增加num值为正数,减少num的值为负数* @return 时间date*/public static Date addOrMinusDay(Date date, int num) {Calendar calendar = Calendar.getInstance();calendar.setTime(new Date());calendar.add(Calendar.DAY_OF_MONTH, num);return calendar.getTime();}/*** @param date 时间类型的字符串* @param pattern 时间的格式类型* @return 根据时间字符串转换成Long毫秒数.  时间字符串的格式应该与Pattern的样式一致.*/public static Long getStringToLong(String date, String pattern) {if (StringUtils.isBlank(date) || StringUtils.isBlank(pattern)) {return 0L;} else {DateTime dateTime = DateTimeFormat.forPattern(pattern).parseDateTime(date);return dateTime.getMillis();}}/*** @param date 时间* @param pattern 时间类型* @return 根据时间字符串转换成Long秒数.  时间字符串的格式应该与Pattern的样式一致.*/public static int getStringToIntSeconds(String date, String pattern) {if (StringUtils.isBlank(date) || StringUtils.isBlank(pattern)) {return 0;} else {DateTime dateTime = DateTimeFormat.forPattern(pattern).parseDateTime(date);return Integer.parseInt(dateTime.getMillis() / 1000 + "");}}/*** @param millis long型的时间值* @param pattern 要转换成的类型* @return 根据String毫秒数转换成时间字符串.*/public static String getLongToString(String millis, String pattern) {DateTime dateTime = new DateTime(Long.parseLong(millis));return dateTime.toString(pattern);}/*** @param millis long型的时间值* @param pattern 要转换成的类型* @return 根据Long毫秒数转换成时间字符串.*/public static String getLongToString(long millis, String pattern) {DateTime dateTime = new DateTime(millis);return dateTime.toString(pattern);}/*** @param seconds 秒值* @return 根据秒数获取天时分秒.*/public static String getRuntimeBySecond(int seconds) {long diffSeconds = seconds % 60;long diffMinutes = seconds / 60 % 60;long diffHours = seconds / (60 * 60) % 24;long diffDays = seconds / (24 * 60 * 60);StringBuffer buffer = new StringBuffer();buffer.append(diffDays + "天" + diffHours + "小时" + diffMinutes + "分钟" + diffSeconds + "秒");return buffer.toString();}/*** @param millis 毫秒值* @return 根据毫秒数获取天时分秒.*/public static String getRuntimeByMillis(long millis) {long diffSeconds = millis / 1000 % 60;long diffMinutes = millis / (60 * 1000) % 60;long diffHours = millis / (60 * 60 * 1000) % 24;long diffDays = millis / (24 * 60 * 60 * 1000);StringBuffer buffer = new StringBuffer();buffer.append(diffDays + "天" + diffHours + "小时" + diffMinutes + "分钟" + diffSeconds + "秒");return buffer.toString();}/*** @param pattern 格式类型* @return 转换时间.*/private static String formateDateTime(String pattern) {DateTime dateTime = new DateTime();return dateTime.toString(pattern);}/*** @param time 字符串时间* @return  转String为Calendar* @throws ParseException 向外抛出异常*/public static Calendar changecal(String time) throws ParseException {//转类型SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");Date date = sdf.parse(time);Calendar cal = Calendar.getInstance();cal.setTime(date);return cal;}/*** @param cal 日期对象* @return 转Calendar为String*/public static String changestr(Calendar cal) {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");String time = sdf.format(cal.getTime());return time;}/*** @param cal 日期对象* @return 根据日期获取当月第一天和最后一天的索引*/public static List<Integer> flmonthdate(Calendar cal) {//获取本月第一天和最后一天int monthMum = cal.get(Calendar.MONTH);cal.set(Calendar.DAY_OF_MONTH, 1);int firstDayInThisMonth = cal.get(Calendar.DAY_OF_YEAR);int nextMonth = monthMum + 1;cal.set(Calendar.MONTH, nextMonth);cal.set(Calendar.DAY_OF_MONTH, 1);int firstDayInNextMonth = cal.get(Calendar.DAY_OF_YEAR);int lastDayInThisMonth = firstDayInNextMonth - 1;List<Integer> list = new ArrayList<Integer>();list.add(0, firstDayInThisMonth);list.add(1, lastDayInThisMonth);return list;}/*** @param cal 指定的日期对象* @return 根据日期获取当天和当月最后一天的索引*/public static List<Integer> twoDay(Calendar cal) {int firstDay = cal.get(Calendar.DAY_OF_YEAR);int monthMum = cal.get(Calendar.MONTH);int firstDayInThisMonth = cal.get(Calendar.DAY_OF_YEAR);int nextMonth = monthMum + 1;cal.set(Calendar.MONTH, nextMonth);cal.set(Calendar.DAY_OF_MONTH, 1);int firstDayInNextMonth = cal.get(Calendar.DAY_OF_YEAR);int lastDayInThisMonth = firstDayInNextMonth - 1;List<Integer> list = new ArrayList<Integer>();list.add(0, firstDay);list.add(1, lastDayInThisMonth);return list;}/*** @param i 日期索引值* @return 日期索引转毫秒数*/public static String changeday(int i) {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");Calendar cal = Calendar.getInstance();//赋值日期cal.set(Calendar.DAY_OF_YEAR, i);//日期转毫秒String time = Long.toString(DateUtils.getStringToLong(sdf.format(cal.getTime()), DateUtils.PATTERN_DATE));return time;}/*** @param date 指定时间* @return 获取当月最大天数* @throws ParseException 向外抛出异常*/public static int getMaxDay(String date) throws ParseException {Calendar cal = changecal(date);int day = cal.getActualMaximum(Calendar.DATE);return day;}/*** @param date 指定的时间* @return  获取当月除星期天天数* @throws ParseException 向外抛出异常*/public static int getDayNoSunday(String date) throws ParseException {Calendar cal = changecal(date);List<Integer> day = flmonthdate(cal);int days = 0;for (int i = day.get(0); i <= day.get(1); i++) {cal.set(Calendar.DAY_OF_YEAR, i);int weekDay = cal.get(Calendar.DAY_OF_WEEK);if (weekDay != 1) {days++;}}return days;}/*** @param date 指定的时间字符串* @return 获取当月某日到月底中除星期天的天数* @throws ParseException 向外抛出异常*/public static int getDayNoSundayBewToDay(String date) throws ParseException {Calendar cal = changecal(date);List<Integer> day = twoDay(cal);int days = 0;for (int i = day.get(0); i <= day.get(1); i++) {cal.set(Calendar.DAY_OF_YEAR, i);int weekDay = cal.get(Calendar.DAY_OF_WEEK);if (weekDay != 1) {days++;}}return days;}/*** @param date 指定的时间字符串* @return 获取当月某日到月底中除星期六星期天的天数* @throws ParseException 向外抛出异常*/public static int getDayNoWeekendBewToDay(String date) throws ParseException {Calendar cal = changecal(date);List<Integer> day = twoDay(cal);int days = 0;for (int i = day.get(0); i <= day.get(1); i++) {cal.set(Calendar.DAY_OF_YEAR, i);int weekDay = cal.get(Calendar.DAY_OF_WEEK);if (weekDay != 1 && weekDay != 7) {days++;}}return days;}/*** @param date 指定的时间字符串* @return 获取当月某日到月底中的天数* @throws ParseException 向外抛出异常*/public static int getDayNoWeekend(String date) throws ParseException {Calendar cal = changecal(date);List<Integer> day = twoDay(cal);int days = 0;for (int i = day.get(0); i <= day.get(1); i++) {days++;}return days;}/*** @param i 索引值* @return 日期索引转String*/public static String getday(int i) {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");Calendar cal = Calendar.getInstance();//赋值日期cal.set(Calendar.DAY_OF_YEAR, i);String day = changestr(cal);return day;}/*** @param time 时间的字符串* @return 时间 转 毫秒  time格式:yyyy-MM-dd hh:mm:ss* @throws ParseException 向外抛出异常*/public static long dateChangeMillisecond(String time) throws ParseException {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");//毫秒long millionSeconds = sdf.parse(time).getTime();return millionSeconds;}/*** @param time 时间的字符串* @return 时间 转 毫秒    time格式:yyyyMMddhhmmss* @throws ParseException 向外抛出异常*/public static long dateChangeMillisecond1(String time) throws ParseException {SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddhhmmss");//毫秒long millionSeconds = sdf.parse(time).getTime();return millionSeconds;}/*** @return 计算当前日期的开始时间  time格式:yyyyMMdd* @throws ParseException 向外抛出异常*/public static long getTodayStartLongSecond() throws ParseException {SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd");//毫秒String nowDate = sdf.format(new Date()) + "000000";return dateChangeMillisecond1(nowDate) / 1000;}/*** @return 计算当前日期的开始时间   time格式:yyyyMMdd* @throws ParseException 向外抛出异常*/public static long getTodayEndLongSecond() throws ParseException {return getTodayStartLongSecond() + 24 * 60 * 60;}/*** @param times   long 时间戳[]* @param dateFor 数据格式类型* @return 计算当前日期的开始时间  time格式:yyyyMMdd*/public static String getDateStrLongtoString(Long times, String dateFor) {SimpleDateFormat sdf = new SimpleDateFormat(dateFor);String str = "";if (times > TIME_MAX) {Date date = new Date(times);str = sdf.format(date);} else {Date date = new Date(times * 1000);str = sdf.format(date);}return str;}/*** @param year  :指定年* @param month :指定月* @param flag  :true 月的最早开始时间   false,最* @return 获取指定月的最早开始时间 和 指定月的最晚时间*/public static Date getStartOrEndDateInTargetMonth(int year, int month, boolean flag) {GregorianCalendar ca = new GregorianCalendar();ca.clear();ca.set(Calendar.YEAR, year);ca.set(Calendar.MONTH, month - 1);Calendar calendar = Calendar.getInstance();//设置时间calendar.setTime(ca.getTime());//要获得上一个月的第一天if (flag) {calendar.set(Calendar.MONTH, calendar.get(Calendar.MONTH));//设置"日"calendar.set(Calendar.DAY_OF_MONTH, 1);//设置"时"calendar.set(Calendar.HOUR_OF_DAY, 0);//设置"分"calendar.set(Calendar.MINUTE, 0);//设置"秒"calendar.set(Calendar.SECOND, 0);//设置"毫秒"calendar.set(Calendar.MILLISECOND, 0);} else {//设置"日"calendar.set(Calendar.DAY_OF_MONTH, calendar.getActualMaximum(Calendar.DAY_OF_MONTH));//设置"时"calendar.set(Calendar.HOUR_OF_DAY, 23);//设置"分"calendar.set(Calendar.MINUTE, 59);//设置"秒"calendar.set(Calendar.SECOND, 59);//设置"毫秒"calendar.set(Calendar.MILLISECOND, 999);}return calendar.getTime();}/*** <p class="detail">* 功能:日期格式化为字符串* </p>* @author tangy* @param date 时间* @param format 格式* @return 日期格式化为字符串*/public static String dateFormat(Date date,String format){return new SimpleDateFormat(format).format(date);}//    public static void main(String[] args) {//        LOG.info(DateUtils.dateFormat(new Date(),DateUtils.PATTERN_DATE_SIMPLE));
//    }public static Date parseDate(String datestr,String format) throws ParseException{return new SimpleDateFormat(format).parse(datestr);}/*** <p class="detail">* 功能:日期查询时根据类型获得日期查询范围的开始时间* </p>* @author tangy* @param dateType 10今天,20昨天,30过去7天,40过去30天,50过去3个月,60过去6个月,70过去一年* @return 日期扣减后的当天凌晨时间*/public static Date getStartDateByType(Integer dateType){Date curDate = new Date();Date sDate = null;Calendar rightNow = Calendar.getInstance();rightNow.setTime(curDate);if(dateType==null){sDate=new Date();}else if(dateType==10){try {sDate = parseDate(getMorningToString(curDate),DATE_TIME_STR);} catch (ParseException e) {e.printStackTrace();}}else if(dateType==20){rightNow.add(Calendar.DAY_OF_YEAR,-1);//日期加减一天try {sDate = parseDate(getMorningToString(rightNow.getTime()),DATE_TIME_STR);} catch (ParseException e) {e.printStackTrace();}}else if(dateType==30){rightNow.add(Calendar.DAY_OF_YEAR,-6);try {sDate = parseDate(getMorningToString(rightNow.getTime()),DATE_TIME_STR);} catch (ParseException e) {e.printStackTrace();}}else if(dateType==35){rightNow.add(Calendar.DAY_OF_YEAR,-13);try {sDate = parseDate(getMorningToString(rightNow.getTime()),DATE_TIME_STR);} catch (ParseException e) {e.printStackTrace();}}else if(dateType==40){rightNow.add(Calendar.MONTH,-1);try {sDate = parseDate(getMorningToString(rightNow.getTime()),DATE_TIME_STR);} catch (ParseException e) {e.printStackTrace();}}else if(dateType==50){rightNow.add(Calendar.MONTH,-3);try {sDate = parseDate(getMorningToString(rightNow.getTime()),DATE_TIME_STR);} catch (ParseException e) {e.printStackTrace();}}else if(dateType==60){rightNow.add(Calendar.MONTH,-6);try {sDate = parseDate(getMorningToString(rightNow.getTime()),DATE_TIME_STR);} catch (ParseException e) {e.printStackTrace();}}else if(dateType==70){rightNow.add(Calendar.MONTH,-12);try {sDate = parseDate(getMorningToString(rightNow.getTime()),DATE_TIME_STR);} catch (ParseException e) {e.printStackTrace();}}return sDate;}/*** <p class="detail">* 功能:获得传入的时间凌晨时间(即00:00:00)* </p>* @author tangy* @param date 时间对象* @return 获得传入的时间凌晨时间(即00:00:00)*/public static String getMorningToString(Date date){return dateFormat(date,DATE_TIME_STR).substring(0,10)+TIME_STR;}/*** <p class="detail">* 功能:获得传入的时间午夜时间(即23:59:59)* </p>* @author tangy* @param date 时间对象* @return 获得传入的时间午夜时间(即23:59:59)*/public static String getNightToString(Date date){return dateFormat(date,DATE_TIME_STR).substring(0,10)+" 23:59:59";}/*** <p class="detail">* 功能:获取上月第一天日期* </p>* @author tangy* @return 获取上月第一天日期*/public static Date getLastMonthFirstDay(){Calendar calendar = Calendar.getInstance();calendar.add(Calendar.MONTH, -1);calendar.set(Calendar.DAY_OF_MONTH, 1);return calendar.getTime();}/*** <p class="detail">* 功能:获得上月最后一天日期* </p>* @author tangy* @return 获得上月最后一天日期*/public static Date getLastMonthEndDay(){Calendar calendar = Calendar.getInstance();calendar.set(Calendar.DAY_OF_MONTH, 1);calendar.add(Calendar.DATE, -1);return calendar.getTime();}/*** <p class="detail">* 功能:获取当月第一天* </p>* @author zhanghl* @return 获取当月第一天*/public static String getCurrentMonthFirstDay(){Calendar calendar = Calendar.getInstance();calendar.set(Calendar.DAY_OF_MONTH, 1);return dateFormat(calendar.getTime(),DATE_TIME_STR).substring(0,10)+TIME_STR;}/*** <p class="detail">* 功能:获得给定的时间所在月份第一天的时间* </p>* @author tangy* @param date 时间* @return 返回给定时间第一天00:00:00点时间*/public static String getMonthFirstDay(Date date){Calendar calendar = Calendar.getInstance();calendar.setTime(date);calendar.set(Calendar.DAY_OF_MONTH, 1);return dateFormat(calendar.getTime(),DATE_STR)+TIME_STR;}/*** <p class="detail">* 功能:获得给定的时间所在月份最后一天的时间* </p>* @author tangy* @param date 时间* @return 返回给定时间最后一天23:59:59点时间*/public static String getMonthEndDay(Date date){Calendar calendar = Calendar.getInstance();calendar.setTime(date);calendar.set(Calendar.DATE, calendar.getActualMaximum(Calendar.DATE));return dateFormat(calendar.getTime(),DATE_STR)+" 23:59:59";}/*** <p class="detail">* 功能:按格式格式化时间字符串,如果时间格式不对则返回空* </p>* @author tangy* @param dateStr 时间字符串* @param dateFormat (如:yyyy-MM-dd HH:mm:ss)* @return 按格式格式化时间字符串,如果时间格式不对则返回空*/public static String getDateFormat(String dateStr,String dateFormat){String result = null;if(dateStr!=null && dateFormat!=null){try{result=dateFormat(parseDate(dateStr,dateFormat),dateFormat);}catch(Exception e){logger.info(e.getMessage());}}return result;}/*** <p class="detail">* 功能:时间加减某个天数后的时间* </p>* @author tangy* @param date 要加减的时间* @param day 要加减的天数,减天数传负数* @return 时间加减某个天数后的时间*/public static Date addSubtractDate(Date date,int day){Calendar resultCalendar = Calendar.getInstance();resultCalendar.setTime(date);resultCalendar.add(Calendar.DAY_OF_YEAR,day);return resultCalendar.getTime();}public static Date timeAddTwoHour(){Date now = new Date();try {return parseDate(dateFormat(new Date(now.getTime()+7200000),"yyyy-MM-dd HH:mm"),"yyyy-MM-dd HH:mm");} catch (ParseException e) {e.printStackTrace();}return now;}/*** <p class="detail">* 功能:时间戳转换成字符窜* </p>* @author zhangqi* @param time 时间戳* @return 时间戳转换成字符窜*/public static String getDateToString(long time) {Date d = new Date(time);SimpleDateFormat sf = new SimpleDateFormat(DATE_TIME_STR);return sf.format(d);}/*** <p class="detail">* 功能:字符串转换成时间戳* </p>* @author zhangqi* @param time 时间的字符串* @return 字符串转换成时间戳*/public static long getStringToDate(String time) {SimpleDateFormat sdf = new SimpleDateFormat(DATE_TIME_STR);Date date = new Date();try{date = sdf.parse(time);} catch(ParseException e) {e.printStackTrace();}return date.getTime();}/*** <p >* 功能:获取当天时间的时间戳 (精确时分秒)* </p>* @param* @author chenyx* @date   * @return XXX*/public static Long getNowDateToDate(){SimpleDateFormat sf = new SimpleDateFormat(DATE_TIME_STR);String nowDateStr = sf.format(new Date());Date date = new Date();try {date = sf.parse(nowDateStr);} catch (ParseException e) {// TODO Auto-generated catch blocke.printStackTrace();}return date.getTime();}/*** <p >* 功能:获取当天时间的时间戳 (年月日)* </p>* @param* @author chenyx* @return Long型年月日时间戳*/public static Long getYearMonthDayTimeStamp(){SimpleDateFormat sf = new SimpleDateFormat(DATE_STR);String nowDateStr = sf.format(new Date());Date date = new Date();try {date = sf.parse(nowDateStr);} catch (ParseException e) {// TODO Auto-generated catch blocke.printStackTrace();}return date.getTime();}/*** <p >* 功能:两个时间之间的秒数* </p>** @param time 第一个时间* @param bigTime 第二个时间* @author zhangq* @return XXX*/public static int countTimes(Date time, Date bigTime){long timeNum = time.getTime();long bigTimeNum = bigTime.getTime();int count = (int)((bigTimeNum - timeNum) / 1000);return count;}
}

1.31.22.PropertiesUtils.java

package com.xxxxx.issue.utils;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.InputStream;
import java.util.Properties;/*** @author tuzuoquan* @version 1.0* @ClassName PropertiesUtils* @description TODO* @date 2020/9/23 9:23**/
public final class PropertiesUtils {private static Logger LOG = LoggerFactory.getLogger(RedisUtil.class);private static PropertiesUtils instance = null;/** 间隔xxx秒产生checkpoing **/private Integer flinkCheckpointsInterval = null;/** 确保检查点之间有至少xxx ms的间隔 **/private Integer flinkMinPauseBetweenCheckpoints = null;/** 检查点必须在一分钟内完成,或者被丢弃【checkpoint的超时时间】 **/private Integer flinkCheckpointTimeout = null;/** 同一时间只允许进行一个检查点 **/private Integer flinkMaxConcurrentCheckpoints = null;/** 尝试重启次数 **/private Integer flinkFixedDelayRestartTimes = null;/** 每次尝试重启时之间的时间间隔 **/private Integer flinkFixedDelayRestartInterval = null;private String rocketmqNameServer = null;private String rocketMqTopics = null;/** rocketmq source 的并行度 **/private Integer rockeqMqSourceParallelism = null;/** redis sink 的并行度 **/private Integer redisSinkParallelism = null;private Integer redisDefaultExpirationTime = null;/*** 静态代码块*/private PropertiesUtils() {try {// 读取配置文件,通过类加载器的方式读取属性文件InputStream in = PropertiesUtils.class.getClassLoader().getResourceAsStream("project-config.properties");Properties prop = new Properties();prop.load(in);rocketmqNameServer = prop.getProperty("rocketmq.name.server.addr").trim();rocketMqTopics = prop.getProperty("rocketmq.topics").trim();flinkCheckpointsInterval = Integer.parseInt(prop.getProperty("flink.checkpoint.interval").trim());flinkMinPauseBetweenCheckpoints = Integer.parseInt(prop.getProperty("flink.checkpoint.minPauseBetweenCheckpoints").trim());flinkCheckpointTimeout = Integer.parseInt(prop.getProperty("flink.checkpoint.checkpointTimeout").trim());flinkMaxConcurrentCheckpoints = Integer.parseInt(prop.getProperty("flink.checkpoint.maxConcurrentCheckpoints").trim());flinkFixedDelayRestartTimes = Integer.parseInt(prop.getProperty("flink.fixedDelayRestart.times").trim());flinkFixedDelayRestartInterval = Integer.parseInt(prop.getProperty("flink.fixedDelayRestart.interval").trim());rockeqMqSourceParallelism = Integer.parseInt(prop.getProperty("flink.rockeqmq.source.parallelism").trim());redisSinkParallelism = Integer.parseInt(prop.getProperty("flink.redis.sink.parallelism").trim());redisDefaultExpirationTime = Integer.parseInt(prop.getProperty("redis.default.expiration.time").trim());in.close();in = null;} catch (Exception e) {throw new ExceptionInInitializerError(e);}}public static PropertiesUtils getInstance() {if (instance == null) {instance = new PropertiesUtils();}return instance;}public Integer getFlinkCheckpointsInterval() {return flinkCheckpointsInterval;}public Integer getFlinkMinPauseBetweenCheckpoints() {return flinkMinPauseBetweenCheckpoints;}public Integer getFlinkCheckpointTimeout() {return flinkCheckpointTimeout;}public Integer getFlinkMaxConcurrentCheckpoints() {return flinkMaxConcurrentCheckpoints;}public String getRocketmqNameServer() {return rocketmqNameServer;}public String getRocketMqTopics() {return rocketMqTopics;}public Integer getRockeqMqSourceParallelism() {return rockeqMqSourceParallelism;}public Integer getRedisSinkParallelism() {return redisSinkParallelism;}public Integer getFlinkFixedDelayRestartTimes() {return flinkFixedDelayRestartTimes;}public Integer getFlinkFixedDelayRestartInterval() {return flinkFixedDelayRestartInterval;}public Integer getRedisDefaultExpirationTime() {return redisDefaultExpirationTime;}
}

1.31.23.RedisUtil.java

package com.xxxxx.issue.utils;import com.xxxxx.tmc.cache.service.impl.TqCacheServiceImpl;
import com.xxxxx.tmc.commons.constant.CacheLevel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;/*** @author tuzuoquan* @version 1.0* @ClassName RedisUtil* @description TODO* @date 2020/10/28 16:06**/
public final class RedisUtil {private static Logger LOG = LoggerFactory.getLogger(RedisUtil.class);private static TqCacheServiceImpl cacheService = new TqCacheServiceImpl("");public static TqCacheServiceImpl getCacheServiceInstance() {//        if (null != cacheService) {//            return cacheService;
//        }
//
//        cacheService = new TqCacheServiceImpl("");return cacheService;}public static void main(String[] args) throws InterruptedException {TqCacheServiceImpl cacheService = RedisUtil.getCacheServiceInstance();
//        cacheService.set("issue:test",2);
//        cacheService.set("issue:2:20210108:000000:0:3:1.1.10.",2);
//        cacheService.set("issue:2:20210108:000000:0:4:1.1.10.1.",23);
//        LOG.info("=================================");
//        LOG.info(cacheService.get("issue:2:20210108:000000:0:4:1.1.10.1.").toString());
//        for(int i = 0 ; i < 10000; ++i) {//            cacheService.set("issue:test",i);
//            System.out.println(cacheService.get("issue:test"));
//        }LOG.info("=================================");String code = "issue:2:20210113:000000:0:4:1.1.10.1.";System.out.println(cacheService.getWithCacheLevel("default",code,CacheLevel.REMOTE));LOG.info("=================================");//cacheService.destroy();}}

1.31.24.IssueConstants.java

package com.xxxxx.issue.constant;/*** @author tuzuoquan* @version 1.0* @ClassName IssueConstants* @description TODO* @date 2020/9/21 8:55**/
public final class IssueConstants {/** 编码的前缀 **/public static final String ISSUE_CODE_PREFIX = "issue:";public static final String ISSUE_CODE_COLON = ":";public static final String TOPIC_SPLITTER = "##";/** 处理维度 **//** 部门以及下辖 **/public static final String DIMENSION_DEPARTMENT_ALL = "0";/** 仅仅是自己处理的 **/public static final String DIMENSION_DEPARTMENT_ONLY = "1";/** 单人 **/public static final String DIMENSION_DEPARTMENT_USER = "2";/** 1、事件受理数 **/public static final String ROCKETMQ_ACCEPT_CONSUMER_GROUP_1 = "group_1";public static final String ROCKETMQ_ACCEPT_TAG = "issue_accept_operat";/** 指标类型 **/public static final String ISSUE_ACCEPT_TYPE = "1";/** 2、发生事件数 **/public static final String ROCKETMQ_HAPPEN_CONSUMER_GROUP_2 = "group_2";public static final String ROCKETMQ_HAPPEN_TAG = "issue_accept_operat || issue_add_operat || issue_delete_operat";/** 指标类型 **/public static final String ISSUE_HAPPEN_TYPE = "2";/** 3、事件办结的TAG **/public static final String ROCKETMQ_PASS_COMPLETE_CONSUMER_GROUP_3 = "group_3";public static final String ROCKETMQ_PASS_COMPLETE_TAG = "issue_inspect_pass_operat || issue_complete_operat";/** 指标类型 **/public static final String ISSUE_PASS_COMPLETE_TYPE = "3";/** 4、签收件次 **/public static final String ROCKETMQ_SIGNFOR_CONSUMER_GROUP_4 = "group_4";public static final String ROCKETMQ_SIGNFOR_TAG = "issue_signfor_operat";/** 指标类型 **/public static final String ISSUE_SIGNFOR_TYPE = "4";/** 5、处置件次的TAG **/public static final String ROCKETMQ_HANDLE_CONSUMER_GROUP_5 = "group_5";public static final String ROCKETMQ_HANDLE_TAG = "issue_comment_operat || issue_assignReply_operat || issue_complete_operat "+ " || issue_report_operat || issue_assign_operat";/** 指标类型 **/public static final String ISSUE_HANDLE_TYPE = "5";}

1.31.25.IssueAcceptRedisSink.java

package com.xxxxx.issue.redissink;import com.xxxxx.issue.constant.IssueConstants;
import com.xxxxx.issue.utils.PropertiesUtils;
import com.xxxxx.tmc.cache.service.impl.TqCacheServiceImpl;
import com.xxxxx.tmc.commons.constant.CacheLevel;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.IOException;/*** @author tuzuoquan* @version 1.0* @ClassName IssuePassRedisSink* @description TODO* @date 2020/9/22 14:49**/
public class IssueAcceptRedisSink extends RichSinkFunction<Tuple4<String, Integer,String, String>> {private static final Logger LOG = LoggerFactory.getLogger(IssueAcceptRedisSink.class);TqCacheServiceImpl cacheService;public IssueAcceptRedisSink() {//this.cacheService = RedisUtil.getCacheServiceInstance();
//      this.cacheService = new TqCacheServiceImpl("");}@Overridepublic void invoke(Tuple4<String, Integer,String, String> input) {try {generateKeyAndData(input.f0,input.f1,input.f2,input.f3);} catch (Exception e) {LOG.error("处理受理数出错,错误信息是: ", e.getMessage());throw e;}}@Overridepublic void open(Configuration parameters) throws Exception {try {LOG.info("=======================cacheService init=============================");this.cacheService = new TqCacheServiceImpl("");LOG.info("=======================cacheService end=============================");} catch (Exception var3) {LOG.error("Redis has not been properly initialized: ", var3);throw var3;}}@Overridepublic void close() throws IOException {try {LOG.info("=======================cacheService close start =============================");this.cacheService.destroy();this.cacheService = null;LOG.info("=======================cacheService close end  =============================");} catch (Exception e) {LOG.error("Redis cacheService has been destroy");}}private synchronized void generateKeyAndData(String handlerOrgCode,Integer operatType,String dayTime,String tenantId) {if (StringUtils.isNotBlank(handlerOrgCode)) {String[] codeSegment = handlerOrgCode.split("\\.");int length = codeSegment.length;//部门仅自己维度String theirOwnCode = new StringBuilder(IssueConstants.ISSUE_CODE_PREFIX)              //表示事件.append(IssueConstants.ISSUE_ACCEPT_TYPE)                  //受理数.append(IssueConstants.ISSUE_CODE_COLON).append(dayTime)                                       //日期:类似20200918.append(IssueConstants.ISSUE_CODE_COLON).append(tenantId)                                       // 租户id  0-9.append(IssueConstants.ISSUE_CODE_COLON).append(IssueConstants.DIMENSION_DEPARTMENT_ONLY)      //仅自己维度.append(IssueConstants.ISSUE_CODE_COLON).append(length)                                        //层级.append(IssueConstants.ISSUE_CODE_COLON).append(handlerOrgCode)                                //自己的这个code.toString();Long theirOwnNum = 1L;// redis 是否存在 这个codeif (null != this.cacheService.getWithCacheLevel("default",theirOwnCode,CacheLevel.REMOTE)) {theirOwnNum = (Long) this.cacheService.getWithCacheLevel("default",theirOwnCode,CacheLevel.REMOTE) + 1;LOG.info("theirOwnNum=" + theirOwnNum);}//this.cacheService.set(theirOwnCode,PropertiesUtils.getInstance().getRedisDefaultExpirationTime(),theirOwnNum);this.cacheService.setWithCacheLevel("default", theirOwnCode,PropertiesUtils.getInstance().getRedisDefaultExpirationTime(), theirOwnNum,CacheLevel.REMOTE);for (int level = 1; level <= length; level++) {//issue:type:date:dimension:level:codeString codePrefix = new StringBuilder(IssueConstants.ISSUE_CODE_PREFIX)      //表示事件.append(IssueConstants.ISSUE_ACCEPT_TYPE)                            //受理数 指标类型.append(IssueConstants.ISSUE_CODE_COLON).append(dayTime)                                                     //日期:类似20200918.append(IssueConstants.ISSUE_CODE_COLON).append(tenantId)                                                   // 租户id  0-9.append(IssueConstants.ISSUE_CODE_COLON).append(IssueConstants.DIMENSION_DEPARTMENT_ALL)                     //部门.append(IssueConstants.ISSUE_CODE_COLON).append(level)                                                       //层级.append(IssueConstants.ISSUE_CODE_COLON).toString();StringBuilder codeSuffix = new StringBuilder();for (int j = 0; j < level; j++) {if (StringUtils.isBlank(codeSuffix.toString())) {codeSuffix.append(codeSegment[j]);continue;}codeSuffix.append(".").append(codeSegment[j]);}codeSuffix.append(".");String code = codePrefix + codeSuffix.toString();LOG.info(code);Long num = 1L;if (null != this.cacheService.getWithCacheLevel("default",code,CacheLevel.REMOTE)) {num = (Long) this.cacheService.getWithCacheLevel("default",code,CacheLevel.REMOTE) + 1;}//this.cacheService.set(code,PropertiesUtils.getInstance().getRedisDefaultExpirationTime(),num);this.cacheService.setWithCacheLevel("default", code,PropertiesUtils.getInstance().getRedisDefaultExpirationTime(), num,CacheLevel.REMOTE);}}}}

1.31.26.IssueAcceptFlinkHandlerByCustomRedisSink.java

package com.xxxxx.issue.flink.handler;import com.alibaba.fastjson.JSON;
import com.xxxxx.doraemon.service.issue.vo.IssueSyncMessageBodyVO;
import com.xxxxx.issue.constant.IssueConstants;
import com.xxxxx.issue.redissink.IssueAcceptRedisSink;
import com.xxxxx.issue.utils.DateUtils;
import com.xxxxx.issue.utils.PropertiesUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.rocketmq.flink.RocketMQConfig;
import org.apache.rocketmq.flink.RocketMQSource;
import org.apache.rocketmq.flink.common.serialization.SimpleKeyValueDeserializationSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.Date;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;/*** @author jun* @version 1.1* @ClassName IssueAcceptFlinkHandlerByCustomRedisSink* @description 今日"受理数"实时处理    type = 1*     含义:预受理中心受理为事件的数量*     区域范围:本部门、本级及下辖(含同级职能部门)*     时间区间:今日*     指标取值:TAG:issue_accept_operat   operatType:61** code规则定义:* issue:type:date:tenant_id:dimension:level:code* issue:              表示事件* type                指标类型  1:受理数   2:发生事件数   3:办结数* date                时间,日期:类似20200918* tenant_id           租户的id* dimension           0:部门   1:自己* level               层级* code                最后一段的code** 有code的为6段时:*     部门维度 (dimension为数字标识0)      6段值     issue:type:date:dimension:level:code*     仅仅自己 (dimension为数字标识1)      1个key    issue:type:date:dimension:level:code** 带用户id的:issue:type:date:dimension:userid       key为id:code (当前的受理数没有单人的统计)*     单人dimension的值为2** @date 2020/9/14 16:37**/
public class IssueAcceptFlinkHandlerByCustomRedisSink {private static final Logger LOG = LoggerFactory.getLogger(IssueAcceptFlinkHandlerByCustomRedisSink.class);private static final Integer OPERATTYPE_1 = 61;public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);PropertiesUtils instance = PropertiesUtils.getInstance();//重启策略之固定间隔 (Fixed delay)env.setRestartStrategy(RestartStrategies.fixedDelayRestart(instance.getFlinkFixedDelayRestartTimes(),Time.of(instance.getFlinkFixedDelayRestartInterval(), TimeUnit.MINUTES)));//设置间隔多长时间产生checkpointenv.enableCheckpointing(instance.getFlinkCheckpointsInterval());//设置模式为exactly-once (这是默认值)env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);//确保检查点之间有至少500 ms的间隔【checkpoint最小间隔】env.getCheckpointConfig().setMinPauseBetweenCheckpoints(instance.getFlinkMinPauseBetweenCheckpoints());//检查点必须在一分钟内完成,或者被丢弃【checkpoint的超时时间】env.getCheckpointConfig().setCheckpointTimeout(instance.getFlinkCheckpointTimeout());//同一时间只允许进行一个检查点env.getCheckpointConfig().setMaxConcurrentCheckpoints(instance.getFlinkMaxConcurrentCheckpoints());// 表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint【详细解释见备注】env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);Properties consumerProps = new Properties();consumerProps.setProperty(RocketMQConfig.NAME_SERVER_ADDR, instance.getRocketmqNameServer());consumerProps.setProperty(RocketMQConfig.CONSUMER_GROUP, IssueConstants.ROCKETMQ_ACCEPT_CONSUMER_GROUP_1);consumerProps.setProperty(RocketMQConfig.CONSUMER_TAG, IssueConstants.ROCKETMQ_ACCEPT_TAG);String topic = instance.getRocketMqTopics();if (StringUtils.isBlank(topic)) {return;}String[] topics = topic.split(IssueConstants.TOPIC_SPLITTER);//合并所有source后的SourceDataStream<Map> finalDataStreamSource = null;for(String topicItem : topics) {consumerProps.setProperty(RocketMQConfig.CONSUMER_TOPIC, topicItem.trim());DataStream<Map> dataStreamSource = env.addSource(new RocketMQSource(new SimpleKeyValueDeserializationSchema("id", "issueInfo"), consumerProps));finalDataStreamSource = (null == finalDataStreamSource) ? dataStreamSource : finalDataStreamSource.union(dataStreamSource);}if (null == finalDataStreamSource) {return;}SingleOutputStreamOperator<Tuple4<String,Integer,String,String>> mainDataStream = finalDataStreamSource.process(new ProcessFunction<Map, Tuple4<String,Integer,String,String>>() {@Overridepublic void processElement(Map in, Context ctx, Collector<Tuple4<String,Integer,String,String>> out) throws Exception {String issueInfo = in.get("issueInfo").toString();LOG.info(issueInfo);try {IssueSyncMessageBodyVO issueMsgVO = JSON.parseObject(issueInfo,IssueSyncMessageBodyVO.class);//当天日期String dayTime = DateUtils.dateFormat(new Date(),DateUtils.PATTERN_DATE_SIMPLE);//处理keyString handlerOrgCode = null;Integer operatType = null;String tenantId = null;if (null != issueMsgVO && null != issueMsgVO.getBody()) {// 处理方组织codehandlerOrgCode = issueMsgVO.getBody().getHandleOrgCode();operatType = issueMsgVO.getOperatType();tenantId = issueMsgVO.getBody().getTenantId();}if (null != operatType && 0 == operatType.compareTo(IssueAcceptFlinkHandlerByCustomRedisSink.OPERATTYPE_1)) {//generateKeyAndData(handlerOrgCode,operatType,dayTime,out);Tuple4<String,Integer,String,String> outVal = new Tuple4<>(handlerOrgCode,operatType,dayTime,tenantId);out.collect(outVal);}} catch (Exception e) {LOG.error("消费事件信息出错,错误的事件是:{},错误信息:{}",issueInfo, e);}}}).name("issueAccept-mq-source").uid("issueAccept").setParallelism(instance.getRockeqMqSourceParallelism());//创建redis的配置mainDataStream.setParallelism(instance.getRedisSinkParallelism());mainDataStream.addSink(new IssueAcceptRedisSink()).name("IssueAcceptRedisSink").uid("IssueAcceptRedisSink");try {String jobName = null;if (args.length == 0) {jobName = IssueAcceptFlinkHandlerByCustomRedisSink.class.getSimpleName();} else {jobName = args[0];}env.execute(jobName);}catch (Exception e) {e.printStackTrace();}}}

1.32.Flink其它案例

1.32.1.使用DataGen生成数据

package com.toto.demo.test;import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.datagen.DataGenerator;
import org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource;
import org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.datagen.types.RowDataGenerator;public class DataGeneratorSourceDemo {public static void main(String[] args) {test1();}private static void test1() {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();String[] fieldNames = new String[] { "id", "state", "score" };DataGenerator<?>[] fieldGenerators = new DataGenerator[] { //RandomGenerator.intGenerator(0, 100), //RandomGenerator.booleanGenerator(), //RandomGenerator.intGenerator(0, 100) //};//第一列是DataGenerator对应的数组,第二列字段名称RowDataGenerator rowDataGenerator = new RowDataGenerator(fieldGenerators, fieldNames);DataStreamSource<RowData> source =//DataGeneratorSource中的第一个参数是RowDataGenerator,第二个参数是间隔时间,第三个参数是数据条数env.addSource(new DataGeneratorSource<>(rowDataGenerator, 10, 20L), TypeInformation.of(RowData.class)).setParallelism(1);source.print().setParallelism(2);try {env.execute();} catch (Exception e) {// TODO Auto-generated catch blocke.printStackTrace();}}
}
输出结果:
1> +I(73,true,43)
2> +I(35,true,99)
1> +I(44,false,19)
2> +I(47,true,8)
1> +I(93,false,37)
2> +I(38,false,79)
1> +I(40,false,16)
2> +I(70,false,57)
1> +I(78,false,50)
2> +I(57,false,71)
1> +I(58,false,56)
2> +I(78,true,68)
1> +I(78,true,67)
2> +I(51,true,3)
1> +I(22,false,89)
2> +I(83,false,0)
1> +I(42,false,32)
2> +I(74,false,18)
1> +I(99,true,73)
2> +I(84,false,89)

1.32.2.使用value state进行存储临时数据

package com.toto.demo.test;import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;public class StateDemo {private static final List<Integer> data = new ArrayList<>(Arrays.asList(1, 2, 3));public static void main(String[] args) {state();}public static void state() {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.fromCollection(data, TypeInformation.of(Integer.class)).keyBy(v -> v % 2).process(new KeyedProcessFunction<Integer, Integer, Integer>() {private ValueState<Integer> sumState;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);ValueStateDescriptor<Integer> vsdesc = new ValueStateDescriptor<>("sum", Integer.class);sumState = getRuntimeContext().getState(vsdesc);}@Overridepublic void processElement(Integer value, Context ctx, Collector<Integer> out) throws Exception {int sum = sumState.value() == null ? 0 : sumState.value();System.out.println("oldSum:\t" + sum);System.out.println("value:\t" + value);sum += value;sumState.update(sum);out.collect(sum);}}).print().setParallelism(2);try {System.out.println(env.getExecutionPlan());env.execute();} catch (Exception e) {e.printStackTrace();}}}

1.31.Flink自定义rocketmq(source/sink)+自定义redis source和sink相关推荐

  1. rocketmq延时消息自定义配置;topic下tag使用

    概述 使用的是开源版本的rocketmq4.9.4 rocketmq也是支持延时消息的. rocketmq一般是4个部分: nameserver:保存路由信息 broker:保存消息 生产者:生产消息 ...

  2. 2021年大数据Flink(十三):流批一体API Sink

    目录 Sink 预定义Sink 基于控制台和文件的Sink 自定义Sink MySQL Sink 预定义Sink 基于控制台和文件的Sink API 1.ds.print 直接输出到控制台 2.ds. ...

  3. Flink写RocketMQ支持动态UserProperty

    Flink写RocketMQ支持动态UserProperty Flink version: 1.14.0 RocketMQ version: 4.5.2 Github: https://github. ...

  4. Flink编程三大组件(一)——Source

    Data Source 就是数据来源. Flink 作为一款流式计算框架,它可用来做批处理,即处理静态的数据集.历史的数据集: 也可以用来做流处理,即实时的处理些实时数据流,实时的产生数据流结果,只要 ...

  5. java自定义jsp标签_Javaweb自定义jsp标签

    自定义标签 用户定义的一种自定义的jsp标记,当一个含有自定义标签的jsp页面被jsp引擎编译成Servlet时,tag标签被转化成了对一个称为标签处理类的对象的操作.于是当jsp页面被jsp引擎转化 ...

  6. pccad自定义图框_(PCCAD自定义标题栏详细方法.doc

    PCCAD2011自定义标题栏详细方法 下面以图3-1为例说明标题栏的自定义过程. 图3-1 1.新建文件(用New 命令). 2.用绘图和文字中的相关命令设计出图3-1所示的图形.其中在使用中不变的 ...

  7. [安卓] 18、一个简单的例子做自定义动画按钮和自定义Actionbar

    在做安卓UI的时候有时候需自定义具有动画效果的按钮或需要自定义一下actionbar~ 本节用一个简单的demo讲如何自定义具有动画效果的按钮,以及个性化的actionbar 下面是效果: 其中: △ ...

  8. 自定义组件开发七 自定义容器

    概述 自定义容器本质上也是一个组件,常见的 LinearLayout.FrameLayout.GridLayout.ScrollView和 RelativeLayout 等等组件都是容器,容器除了有自 ...

  9. 自定义linux iso,制作自定义的ISO镜像(Centos 7.6)

    本文以制作自定义的centos7的ISO镜像为例,自定义ISO无非就是提前安装好yum包,提前安装好pip包,提前安装好自己的应用这三种情况,对于提前安装yum包,如下的链接讲的非常清楚,这里就不再详 ...

最新文章

  1. python dir函数查看函数名
  2. Markdown2.py And Pygments Test
  3. RecyclerView的版本要和appcompat 的版本不一致 引发的错误
  4. servlet请求和响应的过程
  5. python3设置编码_PYTHON3编码再探究
  6. 谷歌浏览器 Chrome 最新版离线安装包下载地址
  7. 天线学习笔记——HFSS简单阵列天线
  8. 新闻管理系统数据库设计
  9. Js获取时间-天干地支空亡
  10. java blowfish ecb,来自blowfish / base64的解密消息时出错
  11. 地铁WIFI值28亿?运营商终于不用背锅了
  12. vue.runtime.esm.js?2b0e:619 [Vue warn]: Error in nextTick: “TypeError: Cannot read properties of und
  13. 【面试准备】MySQL索引篇
  14. 一文搞定pandas的数据合并
  15. 推荐几本互联网行业的经典书目
  16. 假如当初知道skycc营销软件是不是比现在好
  17. html 图片正方形,关于CSS:如何使图像显示为正方形?
  18. 探索 Flutter 异步消息的实现
  19. uva 11178 Morley's Theorem 三角形内角三等分线交点
  20. Vivado与Synplify联合设计FPGA

热门文章

  1. 【FDTD Lumerical超材料S参数提取】
  2. origin2019插入图片_Origin2019下载
  3. 优质供应商选择标准_供应商选择原则
  4. 计算机类课题改革的难点,计算机类的课题论证内容主要从哪些方面来注意呢
  5. 360在线网站安全检测,web安全测试AppScan扫描工具,XSS常用的攻击手法
  6. 当你还在犹豫不决的时候,其他小伙伴已经捷足先登了!
  7. 处理Emoji表情(unicode)
  8. 双线机房双IP linux设置路由
  9. 无法定位序数 354 于动态链接库LIB
  10. 一文读懂裸金属服务器是什么意思,和物理机有啥区别