maven:

<dependency><groupId>org.apache.bahir</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.0</version>
</dependency>

redis是key-value的形式存储。

redis的安装:

编译安装redis到指定的目录下面
下载地址:http://download.redis.io/releases/
1、tar -zxvf redis-3.2.8.tar.gz -C /usr/local/download/
2、安装gcc支持
yum install -y gcc
3、cd /usr/local/download/redis-3.2.8
make PREFIX=/usr/local/software/redis-3.2.8 install
4、创建软连接
ln -s /usr/local/software/redis-3.2.8  /usr/local/software/redis
5、配置环境变量
编辑/etc/profile
最后一行
export REDIS_HOME=/usr/local/software/redis
export PATH=$PATH:$REDIS_HOME/bin
6、让环境变量生效
source /etc/profile

启动reids服务:
cd  /usr/local/software/redis-3.2.8    redis-server &

查看端口号:
cd  /usr/local/software/redis-3.2.8    netstat -anop |grep 6379

启动cli连接程序端
redis-cli -h localhost -p 6379

使用set name huitao

Flink里面使用redis:

package Flink_API;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.util.Collector;import java.io.Serializable;
import java.util.Properties;
public class TestRedis {//主要介绍Flink里面Redis的用法public static void main(String[] args) throws Exception {//创建运行环境StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();//Flink是以数据自带的时间戳字段为准env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//设置并行度env.setParallelism(1);Properties consumerProperties = new Properties();consumerProperties.setProperty("bootstrap.severs","page01:9001");consumerProperties.setProperty("grop.id","browsegroup");DataStreamSource<String> dataStreamSource=env.addSource(new FlinkKafkaConsumer010<String>("browse_topic", (KeyedDeserializationSchema<String>) new SimpleStringSchema(),consumerProperties));DataStream<UserBrowseLog> processData=dataStreamSource.process(new ProcessFunction<String, UserBrowseLog>() {@Overridepublic void processElement(String s, Context context, Collector<UserBrowseLog> collector) throws Exception {try{UserBrowseLog browseLog = com.alibaba.fastjson.JSON.parseObject(s, UserBrowseLog.class);if(browseLog !=null){collector.collect(browseLog);}}catch(Exception e){System.out.print("解析Json——UserBrowseLog异常:"+e.getMessage());}}});//每个用户浏览商品最大记录DataStream<UserBrowseLog> maxData=processData.keyBy("userID").maxBy("productPrice");maxData.print();//配置redisFlinkJedisConfigBase conf=new FlinkJedisPoolConfig.Builder().setHost("192.168.208.200").setPort(6379).build();maxData.addSink(new RedisSink<>(conf,new MyRedisMapper()));//程序的入口类env.execute("TestRedis");}public static class MyRedisMapper implements RedisMapper<UserBrowseLog> {/*** 指定rredis中的那种操作,这里用SET操作(写入)*/@Overridepublic RedisCommandDescription getCommandDescription() {return new RedisCommandDescription(RedisCommand.SET);}/*** 表示从接受数据中获取需要操作的key* @param userBrowseLog* @return*/@Overridepublic String getKeyFromData(UserBrowseLog userBrowseLog) {return userBrowseLog.getUserID();}/*** 表示从接受的数据中获取需要操作的redis value* @param userBrowseLog* @return*/@Overridepublic String getValueFromData(UserBrowseLog userBrowseLog) {return String.valueOf(userBrowseLog.getProductPrice());}}//浏览类public static class UserBrowseLog implements Serializable {private String userID;private String eventTime;private String eventType;private String productID;private Integer productPrice;public String getUserID() {return userID;}public void setUserID(String userID) {this.userID = userID;}public String getEventTime() {return eventTime;}public void setEventTime(String eventTime) {this.eventTime = eventTime;}public String getEventType() {return eventType;}public void setEventType(String eventType) {this.eventType = eventType;}public String getProductID() {return productID;}public void setProductID(String productID) {this.productID = productID;}public Integer getProductPrice() {return productPrice;}public void setProductPrice(Integer productPrice) {this.productPrice = productPrice;}@Overridepublic String toString() {return "UserBrowseLog{" +"userID='" + userID + '\'' +", eventTime='" + eventTime + '\'' +", eventType='" + eventType + '\'' +", productID='" + productID + '\'' +", productPrice=" + productPrice +'}';}}}

Flink之Redis的安装及RedisSink的用法相关推荐

  1. Linux下redis的安装(适用centos)

    转自:https://www.cnblogs.com/_popc/p/3684835.html 第一部分:安装redis  希望将redis安装到此目录 1 /usr/local/redis 希望将安 ...

  2. Redis 入门安装(Linux)

    Redis 入门安装(Linux) 备注:该案例讲解基于CentOS6.5.Reids3.2.8 Redis 官网 中文地址:http://www.redis.cn/ 英文地址:https://red ...

  3. Redis初学:1(NoSQL的简介和Redis的安装)

    什么是NoSQL NoSQL数据库意即:Not Only SQL 不仅仅是SQL,它区别于传统的关系型数据库,储存方式按照的是key-value的形式存储数据,这个我们可以联想到我们以前学过的Hash ...

  4. Linux下redis的安装

    原文出处:http://www.cnblogs.com/_popc/p/3684835.html 第一步:redis的安装 希望将redis安装到此目录 1 /usr/local/redis 希望将安 ...

  5. Redis、Redis+sentinel安装(Ubuntu 14.04下Redis安装及简单测试)

    Ubuntu下Redis安装两种安装方式: 1.apt-get方式 步骤: 以root权限登录,切换到/usr目录下. 接下来输入命令,apt-get install redis-server,如图: ...

  6. redis的安装以及常见运用场景

    2019独角兽企业重金招聘Python工程师标准>>> 1.redis的安装 Window 下安装 下载地址:https://github.com/MSOpenTech/redis/ ...

  7. 浅谈Redis及其安装配置

    一.Redis的介绍 二.Redis的安装配置 三.Redis的配置文件说明 四.Redis的简单操作 简介: Redis是一个开源的使用ANSI C语言编写.支持网络.可基于内存亦可持久化的日志型. ...

  8. linux redis数据库安装配置,Linux系统中redis的安装配置步骤

    Linux系统中redis的安装配置步骤 发布时间:2020-06-23 10:13:36 来源:亿速云 阅读:87 作者:Leah 这篇文章将为大家详细讲解有关Linux系统中redis的安装配置步 ...

  9. CentOS下Redis的安装

    CentOS下Redis的安装 前言 安装Redis需要知道自己需要哪个版本,有针对性的安装,比如如果需要redis GEO这个地理集合的特性,那么redis版本就不能低于3.2版本,由于这个特性是3 ...

最新文章

  1. 超强NLP思维导图,知识点全面覆盖:从基础概念到最佳模型,萌新成长必备资源...
  2. 菜鸟学习Spring——60s利用JoinPoint获取參数的值和方法名称
  3. percona-toolkit之pt-kill:杀掉mysql查询或连接
  4. python结束进程树_【python爬虫】线程进程
  5. miracast投屏软件下载_手机画面如何投屏到电视?
  6. Ubuntu下将dash装换成bash
  7. Android系统开发智能机器人,Android智能机器人详解
  8. 苹果下周将推出紫色版iPhone 13 但只有高端版本
  9. 几张一模一样的照片_每隔几百年,就会出现一个和你一模一样的人?这些照片怎么解释?...
  10. click Parameters
  11. 调查VMware View Composer失败代码(2085204)
  12. nginx一个端口配置多域名服务
  13. Jvm工作原理学习笔记(转)
  14. 获取json格式的内容数据时,使用的方法避免空指针
  15. 论文写作中文核心期刊查询和中图检索号查询
  16. iOS 音乐播放器demo讲解
  17. MBA-day17 假言推理:如果的考法与题型
  18. 心得 ~ 使用 zlib库 解压缩 zip文件
  19. 同时买票是怎么实现的_候补购票和抢票有什么不同 候补购票和抢票可以同时进行吗...
  20. 点餐推荐系统_麦当劳智慧餐厅的微信小程序终究将取代人工点餐和自助点餐机...

热门文章

  1. 第十一天 安装Oracle数据库
  2. 一个CSharp类代码,让你的窗体显示的更酷(转)
  3. 恩智浦智能车大赛2020_我院第十三届“恩智浦”杯智能车校内选拔赛宣讲会顺利举行...
  4. 函数的参数个数是不固定_EXCEL这些序号技巧,你还真不一定都知道
  5. 使用glbindbuffers产生访问冲突_预防IP地址冲突的应对方案,你知道吗?
  6. golang mysql连接池原理_redis mysql 连接池 之 golang 实现
  7. python数据可视化的特点_python的数据分析到底是啥?python数据可视化怎么做?
  8. 时尚美妆图片,让你饱眼福的唯美壁纸
  9. 如果你需要万圣节的图片素材来点缀你的节日活动,看这里就对了
  10. 万能广告促销海报,找不到灵感也不怕