1.30.Flink SQL案例将Kafka数据写入hive
1.30.1.1.场景,环境,配置准备
1.30.1.2.案例代码
1.30.1.2.1.编写pom.xml文件
1.30.1.2.2.Maven工程resources下编写配置文件log4j2.properties
1.30.1.2.3.Maven工程resources下编写配置文件logback.xml
1.30.1.2.4.Maven工程resources下编写配置文件project-config-test.properties
1.30.1.2.5.编写com.xxxxx.log.utils.PropertiesUtils
1.30.1.2.6.编写com.xxxxx.log.flink.handler.ExceptionLogHandlerBySql
1.30.1.2.7.执行命令

1.30.Flink SQL案例将Kafka数据写入hive

1.30.1.1.场景,环境,配置准备

场景:通过Flink SQL的方式,将Kafka的数据实时写入到hive中。
(1)环境

hadoop 3.1.1.3.1.4-315
hive 3.1.0.3.1.4-315
flink 1.12.1

前置准备:
将以下几个包添加到$FLINK_HOME/lib,其中hive-exec-3.1.0.3.1.4.0-315.jar和libfb303-0.9.3.jar从/usr/hdp/current/hive-client/lib中拷贝

1.30.1.2.案例代码

1.30.1.2.1.编写pom.xml文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.xxxxx.zczl</groupId><artifactId>flink-log-handler</artifactId><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><!--maven properties --><maven.test.skip>true</maven.test.skip><maven.javadoc.skip>true</maven.javadoc.skip><!-- compiler settings properties --><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><slf4j.version>1.7.25</slf4j.version><fastjson.version>1.2.73</fastjson.version><joda-time.version>2.9.4</joda-time.version><flink.version>1.12.1</flink.version><scala.binary.version>2.11</scala.binary.version><hive.version>3.1.2</hive.version><hadoop.version>3.1.4</hadoop.version><!-- <hive.version>3.1.0.3.1.4.0-315</hive.version><hadoop.version>3.1.1.3.1.4.0-315</hadoop.version>--><!--<hadoop.version>3.3.0</hadoop.version>--><mysql.connector.java>8.0.22</mysql.connector.java><fileName>flink-log-handler</fileName><!--<mainClass>com.xxxxx.issue.flink.handler.IssueHandleFlinkHandlerByCustomRedisSink</mainClass>--></properties><version>1.0-SNAPSHOT</version><!--<distributionManagement><repository><id>releases</id><layout>default</layout><url>http://nexus.xxxxx.cn/nexus/content/repositories/releases/</url></repository><snapshotRepository><id>snapshots</id><name>snapshots</name><url>http://nexus.xxxxx.cn/nexus/content/repositories/snapshots/</url></snapshotRepository></distributionManagement>--><repositories><!-- <repository><id>releases</id><layout>default</layout><url>http://nexus.xxxxx.cn/nexus/content/repositories/releases/</url></repository><repository><id>snapshots</id><name>snapshots</name><url>http://nexus.xxxxx.cn/nexus/content/repositories/snapshots/</url><snapshots><enabled>true</enabled><updatePolicy>always</updatePolicy><checksumPolicy>warn</checksumPolicy></snapshots></repository><repository><id>xxxxx</id><name>xxxxx</name><url>http://nexus.xxxxx.cn/nexus/content/repositories/xxxxx/</url></repository><repository><id>public</id><name>public</name><url>http://nexus.xxxxx.cn/nexus/content/groups/public/</url></repository>--><!-- 新加 --><repository><id>cloudera</id><layout>default</layout><url>https://repository.cloudera.com/artifactory/cloudera-repos/</url></repository></repositories><!-- <repositories>&lt;!&ndash; Cloudera &ndash;&gt;<repository><id>cloudera-releases</id><url>https://repository.cloudera.com/artifactory/cloudera-repos</url><releases><enabled>true</enabled></releases><snapshots><enabled>false</enabled></snapshots></repository>&lt;!&ndash; Hortonworks &ndash;&gt;<repository><id>HDPReleases</id><name>HDP Releases</name><url>https://repo.hortonworks.com/content/repositories/releases/</url><snapshots><enabled>false</enabled></snapshots><releases><enabled>true</enabled></releases></repository><repository><id>HortonworksJettyHadoop</id><name>HDP Jetty</name><url>https://repo.hortonworks.com/content/repositories/jetty-hadoop</url><snapshots><enabled>false</enabled></snapshots><releases><enabled>true</enabled></releases></repository>&lt;!&ndash; MapR &ndash;&gt;<repository><id>mapr-releases</id><url>https://repository.mapr.com/maven/</url><snapshots><enabled>false</enabled></snapshots><releases><enabled>true</enabled></releases></repository></repositories>--><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version><!--<scope>compile</scope>--><scope>provided</scope></dependency><!-- flink以yarn模式启动,执行flink->sql->hive会用到flink-shaded-hadoop-2-uber包 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-shaded-hadoop-2-uber</artifactId><version>2.7.5-9.0</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-sequence-file</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId><version>${flink.version}</version><!--<scope>compile</scope>--><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version><!-- <scope>compile</scope>--><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version><!--<scope>compile</scope>--><scope>provided</scope></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>${mysql.connector.java}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-hive_${scala.binary.version}</artifactId><version>${flink.version}</version><!--<scope>compile</scope>--><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_${scala.binary.version}</artifactId><version>${flink.version}</version><!--<scope>compile</scope>--><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version><!--<scope>compile</scope>--><scope>provided</scope></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>${hadoop.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>${hadoop.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.hive</groupId><artifactId>hive-jdbc</artifactId><version>${hive.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.hive</groupId><artifactId>hive-metastore</artifactId><version>${hive.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.hive</groupId><artifactId>hive-exec</artifactId><version>${hive.version}</version><!--<scope>compile</scope>--><scope>provided</scope></dependency><!-- <dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>2.9.1</version><scope>provided</scope></dependency>--><dependency><groupId>joda-time</groupId><artifactId>joda-time</artifactId><version>${joda-time.version}</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>${fastjson.version}</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>${slf4j.version}</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>${slf4j.version}</version><scope>test</scope></dependency></dependencies><build><finalName>${fileName}</finalName><plugins><!-- 编译插件 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.6.0</version><configuration><source>${maven.compiler.source}</source><target>${maven.compiler.target}</target><encoding>${project.build.sourceEncoding}</encoding><compilerVersion>${maven.compiler.source}</compilerVersion><showDeprecation>true</showDeprecation><showWarnings>true</showWarnings></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-surefire-plugin</artifactId><version>2.12.4</version><configuration><skipTests>${maven.test.skip}</skipTests></configuration></plugin><plugin><groupId>org.apache.rat</groupId><artifactId>apache-rat-plugin</artifactId><version>0.12</version><configuration><excludes><exclude>README.md</exclude></excludes></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-javadoc-plugin</artifactId><version>2.10.4</version><configuration><aggregate>true</aggregate><reportOutputDirectory>javadocs</reportOutputDirectory><locale>en</locale></configuration></plugin><!-- scala编译插件 --><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.1.6</version><configuration><scalaCompatVersion>2.11</scalaCompatVersion><scalaVersion>2.11.12</scalaVersion><encoding>UTF-8</encoding></configuration><executions><execution><id>compile-scala</id><phase>compile</phase><goals><goal>add-source</goal><goal>compile</goal></goals></execution><execution><id>test-compile-scala</id><phase>test-compile</phase><goals><goal>add-source</goal><goal>testCompile</goal></goals></execution></executions></plugin><!-- 打jar包插件(会包含所有依赖) --><plugin><!--<groupId>org.apache.maven.plugins</groupId>--><artifactId>maven-assembly-plugin</artifactId><!--<version>2.6</version>--><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs><!--<archive><manifest><mainClass>${mainClass}</mainClass></manifest></archive>--></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build></project>
1.30.1.2.2.Maven工程resources下编写配置文件log4j2.properties

具体内容如下:

rootLogger.level = ERROR
rootLogger.appenderRef.console.ref = ConsoleAppenderappender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
1.30.1.2.3.Maven工程resources下编写配置文件logback.xml
<configuration><appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"><encoder><pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern></encoder></appender><appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender"><rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"><!-- Daily rollover --><fileNamePattern>log/generator.%d{yyyy-MM-dd}.log</fileNamePattern><!-- Keep 7 days' worth of history --><maxHistory>7</maxHistory></rollingPolicy><encoder><pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern></encoder></appender><root level="ERROR"><appender-ref ref="FILE" /><appender-ref ref="STDOUT" /></root>
</configuration>
1.30.1.2.4.Maven工程resources下编写配置文件project-config-test.properties
# 测试环境
####################################业务方kafka相关配置 start###########################################
kafka.bootstrap.servers=xxx.xxx.xxx.xxx:9094
# 消费者配置
kafka.consumer.group.id=logkit
kafka.consumer.enableAutoCommit=true
kafka.consumer.autoCommitInterval=1000
kafka.consumer.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
kafka.consumer.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
# 主题
kafka.exception.topic=lk_exception_log_statistics
kafka.log.topic=lk_log_info_statistics
####################################flink相关配置 start###########################################
# 间隔5s产生checkpoing
flink.checkpoint.interval=5000
# 确保检查点之间有至少1000 ms的间隔(可以把这个注释掉:提高checkpoint的写速度===todo===)
flink.checkpoint.minPauseBetweenCheckpoints=1000
# 检查点必须在1min内完成,或者被丢弃【checkpoint的超时时间】
flink.checkpoint.checkpointTimeout=60000
# 同一时间只允许进行一个检查点
flink.checkpoint.maxConcurrentCheckpoints=3
# 尝试重启次数
flink.fixedDelayRestart.times=3
# 每次尝试重启时之间的时间间隔
flink.fixedDelayRestart.interval=5
####################################source和sink
# kafka source读并发
flink.kafka.source.parallelism=1
# hive下沉的并发
flink.hive.sink.parallelism=1
#hive.conf=/usr/hdp/current/hive-client/conf
hive.conf=/usr/hdp/3.1.4.0-315/hive/conf/
hive.zhoushan.database=xxxx_158
1.30.1.2.5.编写com.xxxxx.log.utils.PropertiesUtils
package com.xxxxx.log.utils;import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public final class PropertiesUtils {private static Logger logger = LoggerFactory.getLogger(PropertiesUtils.class);private static PropertiesUtils instance = null;/** 间隔xxx秒产生checkpoing **/private Integer flinkCheckpointsInterval = null;/** 确保检查点之间有至少xxx ms的间隔 **/private Integer flinkMinPauseBetweenCheckpoints = null;/** 检查点必须在一分钟内完成,或者被丢弃【checkpoint的超时时间】 **/private Integer flinkCheckpointTimeout = null;/** 同一时间只允许进行一个检查点 **/private Integer flinkMaxConcurrentCheckpoints = null;/** 尝试重启次数 **/private Integer flinkFixedDelayRestartTimes = null;/** 每次尝试重启时之间的时间间隔 **/private Integer flinkFixedDelayRestartInterval = null;/** kafka source 的并行度 **/private Integer kafkaSourceParallelism = null;/** hive sink 的并行度 **/private Integer hiveSinkParallelism = null;/** kafka集群 **/private String kafkServer = null;/** 消费者组id **/private String groupId = null;private Boolean enableAutoCommit = null;private Long autoCommitInterval = null;private String keyDeserializer = null;private String valueDeserializer = null;private String exceptionTopic = null;private String logTopic = null;private String hiveConf = null;private String database = null;/*** 静态代码块*/private PropertiesUtils() {InputStream in = null;try {// 读取配置文件,通过类加载器的方式读取属性文件in = PropertiesUtils.class.getClassLoader().getResourceAsStream("project-config-test.properties");
//            in = PropertiesUtils.class.getClassLoader().getResourceAsStream("test-win10.properties");
//            in = PropertiesUtils.class.getClassLoader().getResourceAsStream("test-linux.properties");Properties prop = new Properties();prop.load(in);// flink配置flinkCheckpointsInterval = Integer.parseInt(prop.getProperty("flink.checkpoint.interval").trim());flinkMinPauseBetweenCheckpoints =Integer.parseInt(prop.getProperty("flink.checkpoint.minPauseBetweenCheckpoints").trim());flinkCheckpointTimeout = Integer.parseInt(prop.getProperty("flink.checkpoint.checkpointTimeout").trim());flinkMaxConcurrentCheckpoints =Integer.parseInt(prop.getProperty("flink.checkpoint.maxConcurrentCheckpoints").trim());flinkFixedDelayRestartTimes = Integer.parseInt(prop.getProperty("flink.fixedDelayRestart.times").trim());flinkFixedDelayRestartInterval =Integer.parseInt(prop.getProperty("flink.fixedDelayRestart.interval").trim());kafkaSourceParallelism = Integer.parseInt(prop.getProperty("flink.kafka.source.parallelism").trim());hiveSinkParallelism = Integer.parseInt(prop.getProperty("flink.hive.sink.parallelism").trim());// kafka配置kafkServer = prop.getProperty("kafka.bootstrap.servers").trim();groupId = prop.getProperty("kafka.consumer.group.id").trim();enableAutoCommit = Boolean.valueOf(prop.getProperty("kafka.consumer.enableAutoCommit").trim());autoCommitInterval = Long.valueOf(prop.getProperty("kafka.consumer.autoCommitInterval").trim());keyDeserializer = prop.getProperty("kafka.consumer.key.deserializer").trim();valueDeserializer = prop.getProperty("kafka.consumer.value.deserializer").trim();exceptionTopic = prop.getProperty("kafka.exception.topic").trim();logTopic = prop.getProperty("kafka.log.topic").trim();hiveConf = prop.getProperty("hive.conf").trim();database = prop.getProperty("hive.zhoushan.database").trim();} catch (Exception e) {throw new ExceptionInInitializerError(e);} finally {try {if (in != null) {in.close();}} catch (IOException e) {e.printStackTrace();logger.error("流关闭失败");}}}public static PropertiesUtils getInstance() {if (instance == null) {instance = new PropertiesUtils();}return instance;}public Integer getFlinkCheckpointsInterval() {return flinkCheckpointsInterval;}public Integer getFlinkMinPauseBetweenCheckpoints() {return flinkMinPauseBetweenCheckpoints;}public Integer getFlinkCheckpointTimeout() {return flinkCheckpointTimeout;}public Integer getFlinkMaxConcurrentCheckpoints() {return flinkMaxConcurrentCheckpoints;}public Integer getFlinkFixedDelayRestartTimes() {return flinkFixedDelayRestartTimes;}public Integer getFlinkFixedDelayRestartInterval() {return flinkFixedDelayRestartInterval;}public Integer getKafkaSourceParallelism() {return kafkaSourceParallelism;}public Integer getHiveSinkParallelism() {return hiveSinkParallelism;}public String getKafkServer() {return kafkServer;}public String getGroupId() {return groupId;}public Boolean getEnableAutoCommit() {return enableAutoCommit;}public Long getAutoCommitInterval() {return autoCommitInterval;}public String getKeyDeserializer() {return keyDeserializer;}public String getValueDeserializer() {return valueDeserializer;}public String getExceptionTopic() {return exceptionTopic;}public String getLogTopic() {return logTopic;}public String getHiveConf() {return hiveConf;}public String getDatabase() {return database;}
}
1.30.1.2.6.编写com.xxxxx.log.flink.handler.ExceptionLogHandlerBySql

具体内容是:

package com.xxxxx.log.flink.handler;import java.util.concurrent.TimeUnit;import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import com.xxxxx.log.utils.PropertiesUtils;public class ExceptionLogHandlerBySql {private static final Logger logger = LoggerFactory.getLogger(ExceptionLogHandlerBySql.class);public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);PropertiesUtils instance = PropertiesUtils.getInstance();//重启策略之固定间隔 (Fixed delay)env.setRestartStrategy(RestartStrategies.fixedDelayRestart(instance.getFlinkFixedDelayRestartTimes(),Time.of(instance.getFlinkFixedDelayRestartInterval(), TimeUnit.MINUTES)));//设置间隔多长时间产生checkpointenv.enableCheckpointing(instance.getFlinkCheckpointsInterval());//设置模式为exactly-once (这是默认值)env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);//确保检查点之间有至少500 ms的间隔【checkpoint最小间隔】// env.getCheckpointConfig().setMinPauseBetweenCheckpoints(instance.getFlinkMinPauseBetweenCheckpoints());//检查点必须在一分钟内完成,或者被丢弃【checkpoint的超时时间】env.getCheckpointConfig().setCheckpointTimeout(instance.getFlinkCheckpointTimeout());//同一时间只允许进行一个检查点env.getCheckpointConfig().setMaxConcurrentCheckpoints(instance.getFlinkMaxConcurrentCheckpoints());// 表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint【详细解释见备注】env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);env.setParallelism(1);// flink tableEnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);// 构造 kafka source, 用 DEFAULTtableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);String sourceDrop = "drop table if exists kafka_exception";String sourceTable = "CREATE TABLE kafka_exception ("+ "  serviceId STRING,"+ "  serverName STRING,"+ "  serverIp STRING,"+ "  title STRING,"+ "  operationPath STRING,"+ "  url STRING,"+ "  stack STRING,"+ "  exceptionName STRING,"+ "  exceptionInfo STRING,"+ "  operationUser STRING,"+ "  operationIp STRING,"+ "  orgId BIGINT,"+ "  methodClass STRING,"+ "  fileName STRING,"+ "  methodName STRING,"+ "  operationData STRING,"+ "  occurrenceTime BIGINT"+ ") WITH ("+ "    'connector' = 'kafka',"+ "    'topic' = '" + instance.getExceptionTopic() + "',"+ "    'properties.bootstrap.servers' = '" + instance.getKafkServer() + "',"+ "    'properties.group.id' = '" + instance.getGroupId() + "',"+ "    'scan.startup.mode' = 'earliest-offset',"+ "    'format' = 'json',"+ "    'json.fail-on-missing-field' = 'false',"+ "    'json.ignore-parse-errors' = 'true'"+ "  )";System.out.println("=================sourcesql打印开始========================");tableEnv.executeSql(sourceDrop);tableEnv.executeSql(sourceTable);System.out.println(sourceTable);System.out.println("=================sourcesql打印结束========================");// 构造 hive catalog(这个可以任意编写)String name = "mycatalog";String defaultDatabase = instance.getDatabase();String hiveConfDir = instance.getHiveConf();HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);tableEnv.registerCatalog(name, hive);tableEnv.useCatalog(name);// hive sinktableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);tableEnv.useDatabase(defaultDatabase);String sinkDrop = "drop table if exists hive_exception";String sinkTable = "CREATE TABLE hive_exception ("+ "  service_id STRING,"+ "  server_name STRING,"+ "  server_ip STRING,"+ "  title STRING,"+ "  operation_path STRING,"+ "  url STRING,"+ "  stack STRING,"+ "  exception_name STRING,"+ "  exception_info STRING,"+ "  operation_user STRING,"+ "  operation_ip STRING,"+ "  org_id BIGINT,"+ "  method_class STRING,"+ "  file_name STRING,"+ "  method_name STRING,"+ "  operation_data STRING,"+ "  occurrence_time String"+ " ) PARTITIONED BY (dt STRING) STORED AS parquet TBLPROPERTIES ("+ "     'partition.time-extractor.timestamp-pattern'='$dt 00:00:00',"+ "     'sink.partition-commit.trigger'='process-time',"+ "     'sink.partition-commit.delay'='0 s',"+ "     'sink.partition-commit.policy.kind'='metastore,success-file'"+ ")";System.out.println("=================sinksql打印开始========================");tableEnv.executeSql(sinkDrop);tableEnv.executeSql(sinkTable);System.out.println(sinkTable);System.out.println("=================sinksql打印结束========================");String sql = "INSERT INTO TABLE hive_exception"+ " SELECT serviceId, serverName, serverIp, title, operationPath, url, stack, exceptionName, exceptionInfo, operationUser, operationIp,"+ " orgId, methodClass, fileName, methodName, operationData, from_unixtime(cast(occurrenceTime/1000 as bigint),'yyyy-MM-dd HH:mm:ss'), from_unixtime(cast(occurrenceTime/1000 as bigint),'yyyy-MM-dd')"+ " FROM kafka_exception";tableEnv.executeSql(sql);}
}
1.30.1.2.7.执行命令

第一种:standalone模式

$FLINK_HOME/bin/flink run \
-c com.xxxxx.log.flink.handler.ExceptionLogHandlerBySql \
/root/cf_temp/flink-log-handler.jar

第二种:cluster-yarn

$FLINK_HOME/bin/flink run -d -m yarn-cluster \
-yqu real_time_processing_queue \
-p 1 -yjm 1024m -ytm 1024m -ynm ExceptionLogHandler \
-c com.xxxxx.log.flink.handler.ExceptionLogHandlerBySql \
/root/cf_temp/flink-log-handler.jar

第三种:yarn-session(这个需要先提交到yarn获取对应的application_id,所以这个没测试)

$FLINK_HOME/bin/yarn-session.sh -d -nm yarnsession01 -n 2 -s 3 -jm 1024m -tm 2048m
$FLINK_HOME/bin/flink run -d -yid application_1603447441975_0034 \
-c com.xxxxx.log.flink.handler.ExceptionLogHandlerBySql \
/root/cf_temp/flink-log-handler.jar ExceptionLogSession \

Json格式:

{"serviceId":"test000","serverName":"xxx","serverIp":"xxx.xxx.xxx.xxx","title":"xxxx","operationPath":"/usr/currunt","url":"http://baidu.com","stack":"xxx","exceptionName":"xxxx","exceptionInfo":"xxxx","operationUser":"chenfeng","operationIp":"xxx.xxx.xxx.xxx","orgId":777777,"methodClass":"com.xxxxx.Test","fileName":"test.txt","methodName":"findname","operationData":"name=kk","occurrenceTime":"2021-05-12 09:23:20"}

1.30.Flink SQL案例将Kafka数据写入hive相关推荐

  1. Flink1.11 读取kafka数据写入hive,未完待续

    昨天晚上Flink1.11出了,这次改动很多,我只关心hive这一部分. 目前尝试了几个小时用代码读取hive,安装官网的文档,没成功,先蹭个热点,记录下. 先贴一下依赖吧: 注意:反正各种报错,看社 ...

  2. flink源码分析_Flink源码分析之深度解读流式数据写入hive

    前言 前段时间我们讲解了flink1.11中如何将流式数据写入文件系统和hive [flink 1.11 使用sql将流式数据写入hive],今天我们来从源码的角度深入分析一下.以便朋友们对flink ...

  3. 利用SparkSQL(java版)将离线数据或实时流数据写入hive的用法及坑点

    1. 通常利用SparkSQL将离线或实时流数据的SparkRDD数据写入Hive,一般有两种方法.第一种是利用org.apache.spark.sql.types.StructType和org.ap ...

  4. icc校色文件使用教程_Flink教程-flink 1.11使用sql将流式数据写入文件系统

    滚动策略 分区提交 分区提交触发器 分区时间的抽取 分区提交策略 完整示例 定义实体类 自定义source 写入file flink提供了一个file system connector,可以使用DDL ...

  5. kafka学习_《从0到1学习Flink》—— Flink 读取 Kafka 数据写入到 RabbitMQ

    前言 之前有文章 <从0到1学习Flink>-- Flink 写入数据到 Kafka 写过 Flink 将处理后的数据后发到 Kafka 消息队列中去,当然我们常用的消息队列可不止这一种, ...

  6. spark写表指定外部表_spark 将dataframe数据写入Hive分区表

    从spark1.2 到spark1.3,spark SQL中的SchemaRDD变为了DataFrame,DataFrame相对于SchemaRDD有了较大改变,同时提供了更多好用且方便的API. D ...

  7. 基于数据库数据增量同步_基于 Flink SQL CDC 的实时数据同步方案

    简介:Flink 1.11 引入了 Flink SQL CDC,CDC 能给我们数据和业务间能带来什么变化?本文由 Apache Flink PMC,阿里巴巴技术专家伍翀 (云邪)分享,内容将从传统的 ...

  8. cdc工具 postgresql_基于 Flink SQL CDC 的实时数据同步方案

    作者:伍翀 (云邪) 整理:陈政羽(Flink 社区志愿者) Flink 1.11 引入了 Flink SQL CDC,CDC 能给我们数据和业务间能带来什么变化?本文由 Apache Flink P ...

  9. Flink SQL Client进行Kafka事实表与Hbase维度表Join(纯DDL/SQL方式)

    概述: 對參考鏈接[1]進行DDL上的復現. 一些基本的業務常识   來源載體 數據特點 維表 Mysql/Csv/Hbase 很少變化 事實表 Kafka 不停變化 开发环境与准备工作 组件 版本 ...

最新文章

  1. 如何成为一个伟大的 JavaScript 程序员
  2. 如何利用SQL求取微信的共同好友数?
  3. sql计算留存_SQL基础第七讲:关于用户留存率的计算
  4. [转]那些年我还不懂:IList,ICollection,IEnumerable,IEnumerator,IQueryable
  5. php框架 wc if_PHP if else语句
  6. Joint Consensus两阶段成员变更的单步实现
  7. PHP实现上升沿,可编程控制器(PLC)编程如何使用ST语言实现上升沿和下降沿触发...
  8. 手动安装Linux网卡驱动程序
  9. python下载网易云音乐付费歌曲有哪些_学习python总结 - 下载网易云音乐单首歌曲...
  10. 【AI视野·今日CV 计算机视觉论文速览 第181期】Tue, 7 Apr 2020
  11. Relatively Prime Graph(贪心+注意数据范围)
  12. (83)FPGA减法器设计(多位减法器)
  13. 安卓开发的深度技术实战详解
  14. 音乐计算机ut乐谱大全,Flash钢琴乐谱大全.doc
  15. Windows系统:开始--运行--命令大全
  16. Android AES加密解密
  17. iPhone网页显示不全(被遮挡)怎么办?
  18. idea中项目出现乱码的解决方式(包括tomat,maven,HttpServlet页面中文出现乱码等问题)
  19. [10minutes]百家姓
  20. 雾计算与边缘计算的区别

热门文章

  1. iOS小技能:SKU视图搭建
  2. 中国互联网公司亏损能力排行榜
  3. HDFS存储大量小文件居然有这样的问题!看我怎么搞定它!
  4. mysql索引优化分析_MySQL索引优化与分析(重要)
  5. SAP中输出质检Q状态库存清单处理实例
  6. NVIDIA VIDEO ENCODER(NVENC)7.0.1 SDK 编码流程 学习笔记
  7. uniapp 关于swiper组件和moveable-area、moveable-view组件搭配的图片预览高度集成组件
  8. C# 群发邮件 (密送、抄送)
  9. USB audio调试
  10. 华为荣耀android进入,EMUI 9.1公测招募开启 华为荣耀14款机型获支持