一:简介

  1. Ignite是什么?
            一个以内存为中心的分布式数据库、缓存和处理平台,可以在PB级数据中,以内存级的速度进行事务性、
    分析性以及流式负载的处理。
           ignite是分布式内存网格的一种实现,其基于java平台,具有可持久化,分布式事务,分布式计算等特点,此外还支持丰富的键值存储以及SQL语法(基于h2引擎),可以看成是一个分布式内存数据库。

二:基于Ignite的java简单应用实例

  1. 基本使用:
        ignite有两种使用方式: 一种是从官网下载release版本程序,解压运行部署,另外一种是通过嵌入式集成进现有应用程序。
    这里我们使用的是嵌入式集成进现有应用程序。
  2. 环境准备:
         Ignite版本:最新版2.7.5
         springboot版本:2.1.6.RELEASE
         MAVEN:3.5.2
         数据库:Oracle12.2.0.1.0
         zookeeper:zookeeper-3.4.10
         kafka:kafka_2.12-2.2.0
  3. 项目结构如下:
               
        ibor-common:
              
       ibor-pulsar:
            
        ibor-quantfin:
            
  4. 首先我们需要引入项目所需要的jar,这里我们在ibor-common中引入,这个模块作为一个公共的jar分别引入ibor-pulsar,ibor-quantfin.
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.kgf.ibor</groupId><artifactId>ibor-common</artifactId><version>0.0.1-SNAPSHOT</version><!--指定spring-boot的依赖版本  --><parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.6.RELEASE</version> </parent><properties><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><ignite.version>2.7.5</ignite.version><druid.version>1.1.19</druid.version><ojdbc8.version>12.2.0.1.0</ojdbc8.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId><exclusions><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-dbcp2</artifactId></dependency><dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId></dependency><!--下面引入Ignite包  --><dependency><groupId>org.apache.ignite</groupId><artifactId>ignite-core</artifactId><version>${ignite.version}</version></dependency><dependency><groupId>org.apache.ignite</groupId><artifactId>ignite-zookeeper</artifactId><version>${ignite.version}</version><exclusions><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></exclusion><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.ignite</groupId><artifactId>ignite-spring</artifactId><version>${ignite.version}</version><exclusions><exclusion><groupId>org.springframework</groupId><artifactId>spring-core</artifactId></exclusion><exclusion><groupId>org.springframework</groupId><artifactId>spring-context</artifactId></exclusion><exclusion><groupId>org.springframework</groupId><artifactId>spring-beans</artifactId></exclusion><exclusion><groupId>org.springframework</groupId><artifactId>spring-aop</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.ignite</groupId><artifactId>ignite-indexing</artifactId><version>${ignite.version}</version></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><scope>provided</scope></dependency><dependency><groupId>com.oracle</groupId><artifactId>ojdbc8</artifactId><version>${ojdbc8.version}</version></dependency><!--spring集成kafka--><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><exclusions><exclusion><groupId>org.springframework</groupId><artifactId>spring-core</artifactId></exclusion><exclusion><groupId>org.springframework</groupId><artifactId>spring-context</artifactId></exclusion><exclusion><groupId>org.springframework</groupId><artifactId>spring-beans</artifactId></exclusion></exclusions></dependency><dependency><groupId>com.alibaba</groupId><artifactId>druid</artifactId><version>${druid.version}</version></dependency><dependency><groupId>ch.qos.logback</groupId><artifactId>logback-core</artifactId></dependency></dependencies>
    </project>

    注意:上面的springboot版本和Ignite包有一些冲突,需要剔除一些冲突的jar,上面已经剔除。

  5. 应用要实现的功能简介
      ⑴首先我们会分别在ibor-pulsar和ibor-quantfin启动一个节点,这两个节点形成一个集群,并且这两个节点的数据是互通的,
          可以相互读取到对方缓存中的数据。除此之外,我们还可以将Oracle数据库中的数据加载到Ignite缓存中,并且可以将一
          些数据持久化到Oracle数据库中。并且Ignite支持Sql查询,和键值对查询等等方式。
          最后加一个kafka发送和监听数据的例子。
  6. ibor-pulsar服务搭建
     ⑴pom.xml文件,引入ibor-common依赖

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.kgf.ibor</groupId><artifactId>ibor-pulsar</artifactId><version>0.0.1-SNAPSHOT</version><dependencies><dependency><groupId>com.kgf.ibor</groupId><artifactId>ibor-common</artifactId><version>0.0.1-SNAPSHOT</version></dependency></dependencies><build><plugins><!--指定打包插件,可以将项目打包为可执行jar  --><plugin><groupId> org.springframework.boot </groupId> <artifactId> spring-boot-maven-plugin </artifactId> </plugin><!--指定编译的版本  --><plugin><artifactId>maven-compiler-plugin</artifactId><configuration><source>1.8</source><target>1.8</target></configuration></plugin></plugins></build>
    </project>

    ⑵application.yml,配置了kafka,Oracle,Ignite等相关信息,注意:ibor-pulsar和ibor-quantfin必须注册到同一个zookeeper中

    server:port: 9090 #指定端口号#===========kafka配置================
    spring:kafka:bootstrap-servers:- localhost:9092 #指定kafka代理地址,可以是多个producer: #以下是生产者配置retries: 0  #如果这个值大于0,表示启用重试失败的发送次数batch-size:16384 #每当多个记录被发送到统一分区时,生产者会尝试将记录一起批量处理为更少的请求,这有助于提高客户端和服务器性能,默认16384buffer-memory:33554432 #生产者可用于缓冲等待发送到服务器的记录的内存总字节数,默认值为33554432key-serializer: #下面用来指定key和消息体的编解码方式org.apache.kafka.common.serialization.StringSerializervalue-serializer:org.apache.kafka.common.serialization.StringSerializerconsumer: #下面是消费者配置group-id: ibor-consumer-kgf #指定默认消费者的groupId,由于在kafka中,同一组中的consumer不会读取到同一个消息,依靠groupid设置组名##earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费#latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 #none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常auto-offset-reset: earliestenable-auto-commit: false #如果为true,则消费者的偏移量将在后台定期提交,默认值为true auto-commit-interval: 100 #如果'enable.auto.commit'为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000。key-deserializer: #密钥的反序列化器类,实现类实现了接口org.apache.kafka.common.serialization.Deserializerorg.apache.kafka.common.serialization.StringDeserializervalue-deserializer: #值的反序列化器类,实现类实现了接口org.apache.kafka.common.serialization.Deserializerorg.apache.kafka.common.serialization.StringDeserializer#============Ignite配置信息===========
    ignite:clientmode: false
    ibor: domain: DATA_QUANT #配置Ignite节点属性zookeeper:address: localhost:2181 #配置zookeeper连接地址          #=============数据库信息配置==============
    jdbc:driverClassName: oracle.jdbc.driver.OracleDriverurl: jdbc:oracle:thin:@localhost:1521:orclusername: systempassword: 897570          

    ⑶applicationContext.xml文件,配置了数据源对象,以及Ignite集群相关配置。

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsdhttp://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsdhttp://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.0.xsdhttp://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.0.xsd"><!--1:引入自定义在application.yml中的信息  --><bean id="yamlProperties" class="org.springframework.beans.factory.config.YamlPropertiesFactoryBean"><property name="resources" value="classpath:application.yml"></property></bean><context:property-placeholder properties-ref="yamlProperties"></context:property-placeholder> <!--2:数据库配置  --><bean id="dataSource" class="com.alibaba.druid.pool.DruidDataSource"  destroy-method="close"> <property name="driverClassName" value="${jdbc.driverClassName}" /> <property name="url" value="${jdbc.url}" /> <property name="username" value="${jdbc.username}" /> <property name="password" value="${jdbc.password}" /> <!-- 配置初始化大小、最小、最大 --> <property name="initialSize" value="1" /> <property name="minIdle" value="1" /> <property name="maxActive" value="10" /><!-- 配置获取连接等待超时的时间 --> <property name="maxWait" value="10000" /><!-- 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 --> <property name="timeBetweenEvictionRunsMillis" value="60000" /><!-- 配置一个连接在池中最小生存的时间,单位是毫秒 --> <property name="minEvictableIdleTimeMillis" value="300000" /><property name="testWhileIdle" value="true" /><!-- 这里建议配置为TRUE,防止取到的连接不可用 --> <property name="testOnBorrow" value="true" /> <property name="testOnReturn" value="false" /><!-- 打开PSCache,并且指定每个连接上PSCache的大小 --> <property name="poolPreparedStatements" value="true" /> <property name="maxPoolPreparedStatementPerConnectionSize" value="20" /><!-- 这里配置提交方式,默认就是TRUE,可以不用配置 --><property name="defaultAutoCommit" value="true" /><!-- 验证连接有效与否的SQL,不同的数据配置不同 --> <property name="validationQuery" value="select 1 from dual" /> <!--配置监控统计拦截的filters,去掉后监控界面sql无法统计,'wall'用于防火墙  --><property name="filters" value="stat,wall,log4j"/> <!--通过connectProperties属性来打开mergeSql功能;慢SQL记录  --><property name="connectionProperties" value="druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000"/></bean><!--3:定义一个用来获取IP地址的bean对象  --><bean id="localAddress" class="java.net.InetAddress" factory-method="getLocalHost"></bean><!--4:定义Ignite对象,懒加载,在第一次获取该对象时加载  --><bean id="ignite" class="org.apache.ignite.IgniteSpringBean"><property name="configuration"><bean id="igniteConf" class="org.apache.ignite.configuration.IgniteConfiguration"><!--true表示设置该节点为客户端模式,false为服务端模式  --><property name="clientMode" value="${ignite.clientmode:true}"></property><!-- Ignite的分布式计算支持Zero Deployment,实现了节点间的字节码交换。当Peer Class Loading enable时,不需要在网格内的每个节点上手工地部署用以计算的Java或者Scala代码。当一个enable了Peer Class Loading的节点加入一个没有打开Peer Class Loading的集群中时会出差,导致节点无法启动,反之亦然。推荐在配置文件中默认打开--><property name="peerClassLoadingEnabled" value="true"></property><!--用来获取关于节点的动态信息,默认每2秒更新一次  --><property name="metricsUpdateFrequency" value="1000"></property><!--设置公共线程池大小  --><property name="publicThreadPoolSize" value="64"></property><!--设置系统线程池大小  --><property name="systemThreadPoolSize" value="32"></property><property name="memoryConfiguration"><bean class="org.apache.ignite.configuration.MemoryConfiguration"><!--堆外缓存最小必须是10M,这里我们设置4G  --><property name="defaultMemoryPolicySize" value="#{4L*1024*1024*1024}"></property></bean></property><!--配置集群发现机制,Ignite基于zookeeper节点发现  --><property name="discoverySpi"><bean id="tcpDiscoverySpi" class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi"><!--配置查找Ignite节点的方式,基于zookeeper查找  --><property name="ipFinder"><bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.zk.TcpDiscoveryZookeeperIpFinder"><!--连接zookeeper  --><property name="zkConnectionString" value="${ibor.zookeeper.address}"/></bean></property><!--Ignite客户端连接地址,这里设置本地IP地址  --><property name="localAddress" value="#{localAddress.getHostAddress()}"></property><!--设置消息确认超时时间  --><property name="ackTimeout" value="6000"></property></bean></property><!--配置节点属性值  --><property name="userAttributes"><map><!-- 设置节点信息 --><entry key="DOMAIN" value="${ibor.domain:PULSAR}"></entry></map></property></bean></property></bean>
    </beans>

    ⑷logback-spring.xml,日志配置

    <?xml version="1.0" encoding="UTF-8"?>
    <configuration debug="false"><!--定义日志文件的存储地址 勿在 LogBack 的配置中使用相对路径--><property name="LOG_HOME" value="/logs" /><!--控制台日志, 控制台输出 --><appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"><encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"><!--格式化输出:%d表示日期,%thread表示线程名,%-5level:级别从左显示5个字符宽度,%msg:日志消息,%n是换行符--><pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern></encoder></appender><!--文件日志, 按照每天生成日志文件 --><appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender"><rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"><!--日志文件输出的文件名--><FileNamePattern>${LOG_HOME}/pulsar.log.%d{yyyy-MM-dd}.log</FileNamePattern><!--日志文件保留天数--><MaxHistory>30</MaxHistory></rollingPolicy><encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"><!--格式化输出:%d表示日期,%thread表示线程名,%-5level:级别从左显示5个字符宽度%msg:日志消息,%n是换行符--><pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern></encoder><!--日志文件最大的大小--><triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy"><MaxFileSize>10MB</MaxFileSize></triggeringPolicy></appender><!-- show parameters for hibernate sql 专为 Hibernate 定制 --><logger name="org.hibernate.type.descriptor.sql.BasicBinder" level="TRACE" /><logger name="org.hibernate.type.descriptor.sql.BasicExtractor" level="DEBUG" /><logger name="org.hibernate.SQL" level="DEBUG" /><logger name="org.hibernate.engine.QueryParameters" level="DEBUG" /><logger name="org.hibernate.engine.query.HQLQueryPlan" level="DEBUG" /><!--myibatis log configure--><logger name="com.apache.ibatis" level="TRACE"/><logger name="java.sql.Connection" level="DEBUG"/><logger name="java.sql.Statement" level="DEBUG"/><logger name="java.sql.PreparedStatement" level="DEBUG"/><!-- 日志输出级别 --><root level="info"><appender-ref ref="STDOUT" /><appender-ref ref="FILE"/></root>
    </configuration>

    ⑸启动类IborPulsarApplication.java
         
    ⑹创建IgniteInitation.java类,在项目启动时,初始化Ignite对象,以及相关cache缓存实例。
          
     ⑺创建IgniteUtil.java用来初始化和获取Ignite实例对象
           
    ⑻创建IgniteCacheManager.java,用来初始化Ignite中各个Cache对象,并且初始化service层的cache对象。

    package com.kgf.ibor.pulsar.utils;import javax.annotation.Resource;import org.apache.ignite.Ignite;
    import org.apache.ignite.IgniteCache;
    import org.apache.ignite.cache.CacheMode;
    import org.apache.ignite.configuration.CacheConfiguration;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.stereotype.Component;import com.kgf.ibor.common.pulsar.cachestore.TestCacheStoreFactory;
    import com.kgf.ibor.common.pulsar.model.PulsarConstant;
    import com.kgf.ibor.common.pulsar.model.TestKey;
    import com.kgf.ibor.common.pulsar.model.TestValue;
    import com.kgf.ibor.common.utils.DomainNodeFilter;
    import com.kgf.ibor.pulsar.service.TestService;@Component
    public class IgniteCacheManager {@Value("${ibor.domain:PULSAR}")private String domainName;@Resourceprivate IgniteUtil igniteUtil;@Resourceprivate TestService testService;/**** 该方法在IgniteInitation中被调用*/public void init() {try {//初始化缓存initCaches();//初始化service中的cacheinitServices();} catch (Exception e) {e.printStackTrace();}}private void initServices() {testService.init();}private void initCaches() {initTestCache();}private void initTestCache() {//1:首先获取Ignite对象Ignite ignite = igniteUtil.getInstance();IgniteCache<TestKey, TestValue> cache = ignite.cache(PulsarConstant.CACHE_TEST_INFO);if(cache!=null) {return;}//2:自定义存储区域的使用方式CacheConfiguration<TestKey, TestValue> cacheCfg = new CacheConfiguration<TestKey,TestValue>();//3:设置cache名称cacheCfg.setName(PulsarConstant.CACHE_TEST_INFO);//4:设置缓存模式,这里我们使用分区模式,能存储海量数据,频繁更新对其影响不大cacheCfg.setCacheMode(CacheMode.PARTITIONED);//5:设置一个备份,防止数据丢失cacheCfg.setBackups(1);//6:设置缓存Table名称,这里设置通用cacheCfg.setSqlSchema("PUBLIC");//7:设置关联配置的数据类型,key-valuecacheCfg.setIndexedTypes(TestKey.class,TestValue.class);//8;设置节点过滤器,获取我们配置文件中的节点信息cacheCfg.setNodeFilter(new DomainNodeFilter(this.domainName));//9:支持ReadThrough持久化方法,如果要从数据库中加载数据,必须启用这个cacheCfg.setReadThrough(true);//10:启用WriteThrough,可以向数据库中写数据cacheCfg.setWriteThrough(true);//11:配置持久化工厂cacheCfg.setCacheStoreFactory(new TestCacheStoreFactory());//12:启用缓存度量收集cacheCfg.setStatisticsEnabled(true);//13:创建或者获取cache对象cache = ignite.getOrCreateCache(cacheCfg);//14:加载数据到缓存cache.loadCache(null, PulsarConstant.LOAD_HISTORY_DATA);}
    }
    

    ⑼创建TestKey,TestValue对象,这两个我们需要建立在ibor-common模块,因为ibor-quantfin节点获取数据时也需要。当然,我们
        可以根据需求自定义key-value.
        
    ⑽定义各个cache的名称变量
        
     ⑾创建DomainNodeFilter.java,这个是对节点属性的一个拦截校验,必须建立在ibor-common,否则集群起不来。
         
     ⑿创建TestCacheStore,这个是用来和数据库进行交互的,也要建立在ibor-common。

    package com.kgf.ibor.common.pulsar.cachestore;import java.sql.Connection;
    import java.sql.PreparedStatement;
    import java.sql.ResultSet;
    import java.sql.SQLException;
    import java.util.Collection;import javax.cache.Cache.Entry;
    import javax.cache.integration.CacheLoaderException;
    import javax.cache.integration.CacheWriterException;
    import javax.sql.DataSource;import org.apache.commons.lang3.StringUtils;
    import org.apache.ignite.cache.store.CacheStore;
    import org.apache.ignite.cache.store.CacheStoreAdapter;
    import org.apache.ignite.lang.IgniteBiInClosure;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;import com.kgf.ibor.common.pulsar.model.PulsarConstant;
    import com.kgf.ibor.common.pulsar.model.TestKey;
    import com.kgf.ibor.common.pulsar.model.TestValue;/*** 该store用来和数据库进行交互* @author KGF**/
    public class TestCacheStore extends CacheStoreAdapter<TestKey, TestValue> implements CacheStore<TestKey, TestValue>{private transient DataSource dataSource; private Logger log = LoggerFactory.getLogger(TestCacheStore.class);private static final String TABLE_NAME = "IBOR_TEST_KGF";private static final Integer BATCH_NUM = 1000;public void setDataSource(DataSource dataSource) {this.dataSource = dataSource;}/**** 加载数据到缓存中*/@Overridepublic void loadCache(IgniteBiInClosure<TestKey, TestValue> clo, Object... args) {if(dataSource==null)return;String lable = (String) args[0];if(lable.equalsIgnoreCase(PulsarConstant.LOAD_HISTORY_DATA)) {try (Connection conn = dataSource.getConnection()){String sql = "SELECT TEST_ID,TEST_NAME,TEST_DATE FROM "+TABLE_NAME;try(PreparedStatement st = conn.prepareStatement(sql)){try(ResultSet rs = st.executeQuery()){while (rs.next()) {String testId = rs.getString(1);String testName = rs.getString(2);String testDate = rs.getString(3);TestKey k = new TestKey(testId, testDate);TestValue v = new TestValue(testId, testName, testDate);clo.apply(k, v);log.info("[TestCacheStore]: load TestValue:"+v);}log.info("[TestCacheStore]: load TestValue finished...");}}} catch (Exception e) {e.printStackTrace();throw new CacheLoaderException("Failed to load TestValue values to store");}}else {log.info("[TestCacheStore]: loadCache fail,lable is not right");}}@Overridepublic TestValue load(TestKey key) throws CacheLoaderException {return null;}/**** 单条数据存储*/@Overridepublic void write(Entry<? extends TestKey, ? extends TestValue> entry) throws CacheWriterException {if(dataSource==null)return;Connection conn = null;PreparedStatement st = null;try {conn = dataSource.getConnection();String sql = getMergeSql();st = conn.prepareStatement(sql);TestValue value = entry.getValue();String testId = value.getTestId();if(StringUtils.isBlank(testId)) {return;}st.setString(1,testId);st.setString(2,value.getTestName());st.setString(3,value.getTestDate());st.addBatch();st.executeUpdate();log.info("[ibor-TestValue] persistence TestValue:"+value);} catch (Exception e) {try {conn.rollback();} catch (SQLException e1) {e1.printStackTrace();}throw new CacheLoaderException("Failed to write values to database",e);}finally {try {conn.close();} catch (SQLException e) {e.printStackTrace();}}}@Overridepublic void delete(Object key) throws CacheWriterException {}/**** 批量写入数据库*/@Overridepublic void writeAll(Collection<Entry<? extends TestKey, ? extends TestValue>> entries) {if(dataSource==null)return;Connection conn = null;PreparedStatement st = null;try {conn = dataSource.getConnection();conn.setAutoCommit(false);String sql = getMergeSql();st = conn.prepareStatement(sql);int testNum = 0;for (Entry<? extends TestKey, ? extends TestValue> entry : entries) {TestValue value = entry.getValue();String testId = value.getTestId();if(StringUtils.isBlank(testId)) {continue;}st.setString(1,testId);st.setString(2,value.getTestName());st.setString(3,value.getTestDate());st.addBatch();if(++testNum>=BATCH_NUM) {st.executeBatch();conn.commit();log.info("[ibor-TestValue] persistence TestValue num:"+testNum);testNum = 0;}log.debug("add or update TestValue:"+value);}if(testNum>0) {log.debug("[ibor-TestValue] persistence TestValue num:"+testNum);st.executeBatch();conn.commit();}} catch (Exception e) {try {conn.rollback();} catch (SQLException e1) {e1.printStackTrace();}throw new CacheLoaderException("Failed to write values to database",e);}finally {try {conn.close();} catch (SQLException e) {e.printStackTrace();}}}/**** 用来拼接SQL* @return*/public String getMergeSql() {String baseSql = "MERGE INTO "+ TABLE_NAME +" T"+" USING ( "+"SELECT "+"? AS testId,"+"? AS testName,"+"? AS testDate"+" FROM DUAL "+") T1 ON (T.TEST_ID = T1.testId AND T.TEST_DATE = T1.testDate )"+ " WHEN MATCHED THEN ";String middleSql = "UPDATE SET T.TEST_NAME = T1.testName";String endSql = " WHEN NOT MATCHED THEN "+" INSERT (TEST_ID,TEST_NAME,TEST_DATE)"+" VALUES (T1.testId,T1.testName,T1.testDate)";return baseSql+middleSql+endSql;}
    }
    

    ⒀创建缓存存储工厂,TestCacheStoreFactory
          
    ⒁在ibor-pulsar中创建TestService.java接口类以及实现类TestServiceImpl
         

    package com.kgf.ibor.pulsar.service.impl;import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;import javax.annotation.Resource;
    import javax.cache.Cache.Entry;import org.apache.ignite.IgniteCache;
    import org.apache.ignite.cache.query.QueryCursor;
    import org.apache.ignite.cache.query.SqlFieldsQuery;
    import org.apache.ignite.cache.query.SqlQuery;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.stereotype.Component;import com.kgf.ibor.common.pulsar.model.PulsarConstant;
    import com.kgf.ibor.common.pulsar.model.TestKey;
    import com.kgf.ibor.common.pulsar.model.TestValue;
    import com.kgf.ibor.pulsar.service.TestService;
    import com.kgf.ibor.pulsar.utils.IgniteUtil;@Component
    public class TestServiceImpl implements TestService {private Logger log = LoggerFactory.getLogger(TestServiceImpl.class);@Resourceprivate IgniteUtil igniteUtil;private IgniteCache<TestKey,TestValue> testInfoCache;@Overridepublic void init() {testInfoCache = igniteUtil.getInstance().cache(PulsarConstant.CACHE_TEST_INFO);}/**** 根据条件查询*/@Overridepublic List<TestValue> queryTestValueById(String testId){try {List<TestValue> list = new ArrayList<TestValue>();SqlQuery<TestKey,TestValue> sqlQuery = new SqlQuery<TestKey,TestValue>(TestValue.class,"testId = ?");try(QueryCursor<Entry<TestKey,TestValue>> cursor = testInfoCache.query(sqlQuery.setArgs(testId))){for (Entry<TestKey, TestValue> entry : cursor) {list.add(entry.getValue());}} return list;} catch (Exception e) {log.error(e.getMessage());return null;}}/**** 支持完全SQL查询*/@Overridepublic String queryTestName(String testId, Integer count) {try {String sql = "select testId from TestValue where TestId = ? order by testDate desc";if(count != null) {sql = sql+" limit ?";}SqlFieldsQuery query = new SqlFieldsQuery(sql);try(QueryCursor<List<?>> cursor = testInfoCache.query(count==null?query.setArgs(testId):query.setArgs(testId,count))){for (List<?> entry : cursor) {return (String) entry.get(0);}} return null;} catch (Exception e) {log.error(e.getMessage());return null;}}/*** 向数据库插入数据* @param lists*/@Overridepublic void insertOrUpdateTestValues(List<TestValue> lists) {HashMap<TestKey,TestValue> map = new HashMap<TestKey,TestValue>();for (TestValue testValue : lists) {map.put(new TestKey(testValue.getTestId(), testValue.getTestDate()),new TestValue(testValue.getTestId(), testValue.getTestName(), testValue.getTestDate()));}if(map.size()>0) {if(map.size()>1) {testInfoCache.putAll(map);}else {testInfoCache.put(map.keySet().iterator().next(), map.values().iterator().next());}}}}
    

    ⒂创建TestDataController测试类:

    package com.kgf.ibor.pulsar.controller;import java.util.ArrayList;
    import java.util.List;import javax.annotation.Resource;import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;import com.kgf.ibor.common.pulsar.model.TestValue;
    import com.kgf.ibor.pulsar.kafka.producer.KafkaProducer;
    import com.kgf.ibor.pulsar.service.TestService;@RequestMapping("testData")
    @RestController
    public class TestDataController {@Autowiredprivate TestService testService;@Resourceprivate KafkaProducer kafkaProducer;@RequestMapping("sendMsg")public void sendMsg() {kafkaProducer.send();}@RequestMapping("sendQuantfin")public void sendQuantfin() {kafkaProducer.sendQuantfin();}@RequestMapping("queryTestValueById")public List<TestValue> queryTestValueById(String testId){return testService.queryTestValueById(testId);}@RequestMapping("queryTestName")public String queryTestName(String testId,Integer count) {return testService.queryTestName(testId,count);}@RequestMapping("insertOrUpdateTestValues")public void insertOrUpdateTestValues() {List<TestValue> lists = new ArrayList<TestValue>();for (int i = 0; i < 10; i++) {lists.add(new TestValue("000"+i+3,"name"+i,"20190803"));}testService.insertOrUpdateTestValues(lists);}
    }
    
  7. ibor-quantfin服务搭建
      ⑴pom.xml

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.kgf.ibor</groupId><artifactId>ibor-quantfin</artifactId><version>0.0.1-SNAPSHOT</version><dependencies><dependency><groupId>com.kgf.ibor</groupId><artifactId>ibor-common</artifactId><version>0.0.1-SNAPSHOT</version></dependency></dependencies><build><plugins><!--指定打包插件,可以将项目打包为可执行jar  --><plugin><groupId> org.springframework.boot </groupId> <artifactId> spring-boot-maven-plugin </artifactId> </plugin><!--指定编译的版本  --><plugin><artifactId>maven-compiler-plugin</artifactId><configuration><source>1.8</source><target>1.8</target></configuration></plugin></plugins></build>
    </project>

    ⑵配置文件:
          
     ⑶启动类IborQuantfinApplication:
           
      ⑷项目启动初始化类CapitalIndicatorStarter.java
           
       ⑸IgniteUtil不变:
            
       ⑹IgniteCacheManager.java初始化一个sevice
            
       ⑺创建QuantfinTestService以及实现类QuantfinTestServiceImpl
           

    package com.kgf.ibor.quantfin.service.impl;import java.util.List;import javax.annotation.Resource;import org.apache.ignite.IgniteCache;
    import org.apache.ignite.cache.query.QueryCursor;
    import org.apache.ignite.cache.query.SqlFieldsQuery;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.stereotype.Component;import com.kgf.ibor.common.pulsar.model.PulsarConstant;
    import com.kgf.ibor.common.pulsar.model.TestKey;
    import com.kgf.ibor.common.pulsar.model.TestValue;
    import com.kgf.ibor.quantfin.service.QuantfinTestService;
    import com.kgf.ibor.quantfin.utils.IgniteUtil;@Component
    public class QuantfinTestServiceImpl implements QuantfinTestService{private Logger log = LoggerFactory.getLogger(QuantfinTestServiceImpl.class);@Resourceprivate IgniteUtil igniteUtil;private IgniteCache<TestKey,TestValue> testInfoCache;@Overridepublic void init() {testInfoCache = igniteUtil.getInstance().cache(PulsarConstant.CACHE_TEST_INFO);}/**** 支持完全SQL查询*/@Overridepublic String queryTestName(String testId, Integer count) {try {String sql = "select testId from TestValue where TestId = ? order by testDate desc";if(count != null) {sql = sql+" limit ?";}SqlFieldsQuery query = new SqlFieldsQuery(sql);try(QueryCursor<List<?>> cursor = testInfoCache.query(count==null?query.setArgs(testId):query.setArgs(testId,count))){for (List<?> entry : cursor) {return (String) entry.get(0);}} return null;} catch (Exception e) {log.error(e.getMessage());return null;}}
    }
    

    ⑻创建QuantfinTestController
         

  8. 数据库表准备:
     
  9. 测试,分别先启动ibor-pulsar和后启动ibor-quantfin
     下面的打印日志,表示两个节点启动成功。
      
     ⑴首先我们调用ibor-pulsar向缓存和数据库中插入数据的接口:
            
           查看数据库:
           
           调用ibor-pulsar中查看缓存中数据的方法:
           
         出现上面的效果说明OK.
    ⑵在调用ibor-quantfin中接口去查询,ibor-pulsar中插入的数据:
         
         出现上面的效果,说明我们的应用是没问题的。
  10. 最后,看看kafka发送消息案例(这里我们是基于topic发送和接收消息的):
     ⑴我们在ibor-pulsar创建一个生产者,将消息发送到ibor-quantfin
          

    package com.kgf.ibor.pulsar.kafka.producer;import java.util.Date;
    import java.util.UUID;import javax.annotation.Resource;import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.stereotype.Component;import com.google.gson.Gson;
    import com.google.gson.GsonBuilder;
    import com.kgf.ibor.common.model.KafkaMessage;/**** 生产者* @author KGF**/
    @Component
    public class KafkaProducer {@Resourceprivate KafkaTemplate<String,String> kafkaTemplate;private Gson gson = new GsonBuilder().create();private Logger log = LoggerFactory.getLogger(KafkaProducer.class);//发送消息public void send() {KafkaMessage msg = new KafkaMessage();msg.setId(System.currentTimeMillis());msg.setMsg(UUID.randomUUID().toString());msg.setSendTime(new Date());kafkaTemplate.send("topic1", gson.toJson(msg));log.info("[KafkaProducer send msg is]:"+msg);}/*** 发送给Quantfin*/public void sendQuantfin() {KafkaMessage msg = new KafkaMessage();msg.setId(System.currentTimeMillis());msg.setMsg(UUID.randomUUID().toString());msg.setSendTime(new Date());kafkaTemplate.send("topic-kgf", gson.toJson(msg));log.info("[KafkaProducer send msg is]:"+msg);}
    }
    

    ⑵在ibor-quantfin创建一个消费者
          
          

三:KAFKA以及zokeeper别忘了提前安装和启动!

Ignite分布式的内存数据库简单应用相关推荐

  1. apache ignite 分布式内存数据库

    1 简介 ignite是分布式内存网格的一种实现,其基于java平台,具有可持久化,分布式事务,分布式计算等特点,此外还支持丰富的键值存储以及SQL语法(基于h2引擎),可以看成是一个分布式内存数据库 ...

  2. 【noqsl】beansdb的分布式实现~简单粗暴有效~

    版本: beansdb 0.5.3 下载地点: http://code.google.com/p/beansdb/ beansdb是豆瓣用来存储不变小object的支持中等数据规模的nosql的持久型 ...

  3. 分布式 RPC架构简单理解

    RPC框架 RPC(Remote Promote Call) 一种进程间通信方式.允许像调用本地服务一样调用远程服务. RPC框架的主要目标就是让远程服务调用更简单.透明.RPC框架负责屏蔽底层的传输 ...

  4. 基于redis 内存数据库简单使用

    在ecplise中使用内存数据的客端户,前提要准备要下载两个jar包 commons-pool2-2.0.jar jedis-2.4.2.jar 前提准备做好了,那我们就开启redis的服务,打开一个 ...

  5. 多机器使用setnx 设置同一个key_Redisson分布式锁的简单使用

    做一个积极的人 编码.改bug.提升自己 我有一个乐园,面向编程,春暖花开! 一:前言 我在实际环境中遇到了这样一种问题,分布式生成id的问题!因为业务逻辑的问题,我有个生成id的方法,是根据业务标识 ...

  6. 分布式事务——seata简单使用

    1. 本地事务 1.1 事务的基本性质 数据库事务的几个特性:原子性.一致性.隔离性或独立性.持久性简称ACID 1)原子性:一系列的操作整体不可拆分,要么同时成功,要么同时失败 2)一致性:数据在事 ...

  7. 关于几种分布式锁的简单介绍

    什么是分布式锁 要介绍分布式锁,首先要提到与分布式锁相对应的是线程锁.进程锁. 1.线程锁 主要用来给方法.代码块加锁.当某个方法或代码使用锁,在同一时刻仅有一个线程执行该方法或该代码段.线程锁只在同 ...

  8. 分布式存储系统-Ceph简单分析

    1 Ceph分布式存储系统分析 Ceph是一个统一的分布式存储系统,可同时提供块.文件和对象3种接口的存储服务.与传统的分布式存储系统不同,它采用了无中心节点的元数据管理方式,因此具有良好的扩展性和线 ...

  9. Hadoop(CDH)分布式环境搭建(简单易懂,绝对有效)

    大数据技术与架构 点击右侧关注,大数据开发领域最强公众号! 暴走大数据 点击右侧关注,暴走大数据! 本文是由alice菌发表在:https://blog.csdn.net/weixin_4431883 ...

  10. 基于Redis的分布式锁的简单实现

    Redis官方给出两种思路 第一种:SET key value [EX seconds] [PX milliseconds] NX 第二种:SETNX+GETSET 首先,分别看一下这几个命令 SET ...

最新文章

  1. 中文字号转换成英文的字号
  2. MySQL ACID及四种隔离级别的解释
  3. oracle util_mail,在oracle 10g中发送电子邮件
  4. java 连接oracle 进行增删改查
  5. 移植ubuntu14.04根文件系统至beaglebone开发板探索
  6. R.java文件无法自动生成的问题
  7. android最新图表框架,Android中绘制图表的开源框架AChartEngine初识
  8. OpenJudge NOI题库 116题
  9. Pano2VR制作全景图缩略图导航
  10. 《巴菲特的第一桶金》读书笔记
  11. volatility使用
  12. 以太网 传统STP生成树的BPDU介绍、STP端口状态介绍与切换过程,STP详细的工作过程。
  13. 淘宝电商为什么转型社群团购,你知道吗?
  14. Arcpy基础入门-2、arcpy的批处理功能
  15. 微商城表结构--记录
  16. Mybatis-Plus用纯注解完成一对多多对多查询
  17. vue3 动态传值给子组件
  18. 苹果7防水吗_苹果手机防水是真的吗?事实很残酷,但还可以抢救一下
  19. Shenango NSDI 2019Achieving High CPU efficiency for Latency-sensitive Datacenter workloads
  20. CSS 2D 3D转换

热门文章

  1. 实战第二步:如何做一份有针对性的竞品分析
  2. 小米手机怎么设置鸿蒙开机动画,小米9开机动画太酷炫了!还不知道怎么设置赶紧来看看!...
  3. 基于PHP的留言板毕业论文,网络留言板
  4. 微信公众号敏感词检测工具
  5. 华为研发岗位两轮面试的准备(本科生,已经拿到offer,月薪20k,15薪)
  6. Java QQ授权第三方登陆
  7. 记录用vs2017安装windows driver失败的坑爹过程
  8. 如何创造一个能和你对话的语音AI?
  9. matlab平滑处理例题,(完整word版)matlab中smooth函数平滑处理数据实例
  10. win10远程控制ubuntu16.04