基于Ignite的ContinuousQuery

作者: SofiyaJ | 来源:发表于2018-03-27 15:43 被阅读31次

知识准备

持续查询语言(CQL, continuous query language)类似于:
内存数据库+视图+触发器 的解决方案。
简单来说,一有符合条件的对象进入查询结果集,就执行一次回调函数。

本文的实现是基于C/S模式的,即Client端先按照一定规则从Server端查询数据,返回结果集后,Server端继续添加符合条件的数据,Client端仍然可以实时查询返回结果。
持续查询可以监听缓存中数据的变更。持续查询一旦启动,如果有,就会收到符合查询条件的数据变化的通知。

主要maven依赖:


        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.ignite</groupId>
            <artifactId>ignite-core</artifactId>
            <version>${ignite.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.ignite</groupId>
            <artifactId>ignite-spring</artifactId>
            <version>${ignite.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.ignite</groupId>
            <artifactId>ignite-indexing</artifactId>
            <version>${ignite.version}</version>
        </dependency>
    <properties>
        <ignite.version>2.4.0</ignite.version>
    </properties>

TIPS:本工程使用的ignite的版本是2.4.0,ignite更新迭代较快,版本见得差异还是很大的。

主要代码实现

Server端实现:

github源代码

package xx.xx.searchengine;

import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.Ignition;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

/**
 * @Author: wangjie
 * @Description:
 * @Date: Created in 10:13 2018/3/27
 */

@SpringBootApplication
@RestController
public class ServerApplication {

    //cache name
    private static final String CACHE_NAME = "serverCache";

    private static Ignite ignite = Ignition.start("example-cache.xml");

    public static void main(String[] args) throws InterruptedException {

        SpringApplication.run(ServerApplication.class,args);

    }

    @RequestMapping(value = "/testIgnite",method = RequestMethod.GET)
    public String testIgnite(Integer key,String value) throws InterruptedException{
        ignite.active(true);
        System.out.println("*******insert data begins*********");

        try(IgniteCache<Integer, String> cache = ignite.getOrCreateCache(CACHE_NAME)){
            cache.put(key,value);
            Thread.sleep(2000);
        }
        return "*******insert data succeed*********";
    }
}

example-cache.xml主要配置(在初始化文件之后添加的):

 <property name="clientMode" value="false"/>
 <property name="peerClassLoadingEnabled" value="true"/>

Client端实现:

github源代码

package xx.xx.searchengine;

import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.Ignition;
import org.apache.ignite.cache.query.ContinuousQuery;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.ScanQuery;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

import javax.cache.Cache;
import javax.cache.event.CacheEntryEvent;
import javax.cache.event.CacheEntryUpdatedListener;

/**
 * @Author: wangjie
 * @Description:
 * @Date: Created in 10:26 2018/3/27
 */
@SpringBootApplication
public class ClientApplication {

    //cache name
    private static final String CACHE_NAME = "serverCache";

    public static void main(String[] args) throws InterruptedException {

        SpringApplication.run(ClientApplication.class, args);


        try (Ignite ignite = Ignition.start("example-cache.xml")) {
            ignite.active(true);
            System.out.println("**********Cache continuous query example started**********");

            try (IgniteCache<Integer, String> cache = ignite.getOrCreateCache(CACHE_NAME)) {

                // Create new continuous query.
                ContinuousQuery<Integer, String> qry = new ContinuousQuery<>();
                //init query
                qry.setInitialQuery(new ScanQuery<>(new IgniteBiPredicate<Integer, String>() {
                    @Override
                    public boolean apply(Integer key, String val) {
                        return key > 0;
                    }
                }));
                 //set local listener
                qry.setLocalListener(new CacheEntryUpdatedListener<Integer, String>() {
                    @Override
                    public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends String>> evts) {
                        for (CacheEntryEvent<? extends Integer, ? extends String> e : evts) {
                            System.out.println("Updated entry [key=" + e.getKey() + ", val=" + e.getValue() + ']');
                        }
                    }
                });


                try (QueryCursor<Cache.Entry<Integer, String>> cur = cache.query(qry)) {
                    // Iterate through existing data.
                    for (Cache.Entry<Integer, String> e : cur) {
                        System.out.println("Queried existing entry [key=" + e.getKey() + ", val=" + e.getValue() + ']');
                        Thread.sleep(2000000000);
                    }
                } finally {
                    ignite.destroyCache(CACHE_NAME);
                }
            }
        }
    }
}
  • 初始化查询
    当要执行持续查询时,在将持续查询注册在集群中以及开始接收更新之前,可以有选择地指定一个初始化查询。
    初始化查询可以通过ContinuousQuery.setInitialQuery(Query)方法进行设置,并且可以是任意查询类型,包括扫描查询,SQL查询和文本查询。
  • 远程过滤器
    这个过滤器在给定键对应的主和备节点上执行,然后评估更新是否需要作为一个事件传播给该查询的本地监听器。
    如果过滤器返回true,那么本地监听器就会收到通知,否则事件会被忽略。产生更新的特定主和备节点,会在主/备节点以及应用端执行的本地监听器之间,减少不必要的网络流量。
    远程过滤器可以通过ContinuousQuery.setRemoteFilter(CacheEntryEventFilter<K, V>)方法进行设置。
  • 本地监听器
    当缓存被修改时(一个条目被插入、更新或者删除),更新对应的事件就会发送给持续查询的本地监听器,之后应用就可以做出对应的反应。
    当事件通过了远程过滤器,他们就会被发送给客户端,通知哪里的本地监听器。
    本地监听器是通过ContinuousQuery.setLocalListener(CacheEntryUpdatedListener<K, V>)方法设置的。

example-cache.xml主要配置(在初始化文件之后添加的):

  <property name="clientMode" value="true"/>
  <property name="peerClassLoadingEnabled" value="true"/>

启动程序,测试连续查询

启动Server端:

Server.png

启动Client端:

Client.png

在postman中发送get请求:

http://localhost:8080/testIgnite?key=26&value="hahahah"

postman.png

查看Client端控制台的输出信息:

Client-updata.png

关于ignite的其它文章:
Ignite CS 模式 java初探
Ignite 之计算运用的 Hello world

程序媛小白一枚,如有错误,烦请批评指正!(#.#)

相关文章

网友评论

本文标题:基于Ignite的ContinuousQuery

本文链接:https://www.haomeiwen.com/subject/jscscftx.html