RedisGears JVM 快速入门

学习如何使用 Java 与 RedisGears 交互的快速入门指南。

先决条件

此快速入门需要以下内容:

教程

创建 Maven 项目

  1. 创建一个新的 Maven 项目

  2. 将以下部分添加到 pom.xml 文件中

    <repositories>
        <repository>
            <id>snapshots-repo</id>
            <url>https://oss.sonatype.org/content/repositories/snapshots</url>
        </repository>
    </repositories>
    
    <dependencies>
        <dependency>
            <groupId>com.redislabs</groupId>
            <artifactId>gear_runtime</artifactId>
            <version>0.0.3-SNAPSHOT</version>
        </dependency>
    </dependencies>
    
  3. 将以下示例代码添加到项目的 main 函数中:批处理事件处理

构建 JAR

使用 Maven 命令行工具或 IDE 插件将代码编译并打包成 JAR 文件。

$ mvn package

上传 JAR

将 JAR 文件上传到 Redis Enterprise 集群中的一个节点。在运行代码时,需要使用目标文件路径。

运行 RedisGears Java 代码

使用 RG.JEXECUTE 命令运行代码。

$ redis-cli -x -h {host} -p {port} RG.JEXECUTE {package.MainClass} < {filepath}/{JAR name}.jar
注意
当使用 GearsBuilder.run() 时,RG.JEXECUTE 会立即运行代码。

但是,如果使用 GearsBuilder.register()RG.JEXECUTE 仅在成功注册时输出 OK 消息。已注册的代码将在某些数据库事件发生时运行。

示例代码

您可以使用这些代码示例和自己的 Maven 项目来尝试使用 RedisGears JVM 插件进行批处理或事件处理。

批处理

如果在代码中使用 GearsBuilder.run() 函数,则在使用 RG.JEXECUTE 运行 JAR 文件时,添加到管道的函数将只执行一次。

以下示例计算存储在数据库中的所有餐厅评论的平均评分。

将数据添加到数据库

  1. 使用 redis-cli 连接到数据库

    $ redis-cli -h <host> -p <port>
    
  2. 使用 HSET 命令将一些评论哈希添加到数据库

    127.0.0.1:12000> HSET review:1 user "Alex L" message "My new favorite restaurant!" rating 5
    (integer) 3
    127.0.0.1:12000> HSET review:2 user "Anonymous user" message "Kind of overpriced" rating 2
    (integer) 3
    127.0.0.1:12000> HSET review:3 user "Francis J" message "They have a really unique menu." rating 4
    (integer) 3
    127.0.0.1:12000> exit
    

示例代码

import java.io.Serializable;
import gears.GearsBuilder;
import gears.readers.KeysReader;
import gears.records.KeysReaderRecord;

public class Reviews implements Serializable
{

	private static final long serialVersionUID = 1L;
	int count; // Total number of reviews
	int ratingsSum; // Sum of all review ratings
	
    // Reviews constructor
	public Reviews(int count, int ratingsSum) {
		this.count = count;
		this.ratingsSum = ratingsSum;
	}

    public static void main(String args[]) 
    {  
        // Create the reader that will pass data to the pipe
        KeysReader reader = new KeysReader();
        
        // Create the data pipe builder
        GearsBuilder<KeysReaderRecord> gb = GearsBuilder.CreateGearsBuilder(reader);
        
		gb.filter(r->{
			// Filter out any keys that are not reviews
			return r.getKey().startsWith("review:");
		}).map(r->{
			// Extract the rating field
			return r.getHashVal().get("rating");
		})
		.accumulate(new Reviews(0, 0), (accumulator, record)-> {
			// Count the reviews and add up all of their ratings
			accumulator.count++;
			accumulator.ratingsSum += Integer.parseInt(record);
			return accumulator;
		}).map(r->{
			// Calculate the average rating
			return Double.valueOf(((double) r.ratingsSum) / r.count);
		});
             
        // Run the data through the pipeline immediately
        gb.run();
    }
}

示例输出

$ redis-cli -x -h {host} -p {port} \
    RG.JEXECUTE com.domain.packagename.Reviews < /tmp/rgjvmtest-0.0.1-SNAPSHOT.jar
1) 1) "3.6666666666666665"
2) (empty array)

事件处理

如果在代码中使用 GearsBuilder.register() 函数,则在每次发生特定数据库事件时,添加到管道的函数都会运行。

以下示例注册一个函数管道,以便在每次将新的个人哈希添加到数据库时自动更新最大年龄。

示例代码

import gears.GearsBuilder;
import gears.readers.KeysReader;
import gears.records.KeysReaderRecord;

public class App 
{
    public static void main(String args[]) 
    {  
        // Create the reader that will pass data to the pipe
        KeysReader reader = new KeysReader();
        
        // Create the data pipe builder
        GearsBuilder<KeysReaderRecord> gb = GearsBuilder.CreateGearsBuilder(reader);

        // Only process keys that start with "person:"
        gb.filter(r->{
        	return r.getKey().startsWith("person:");
       	});

        // Compare each person's age to the current maximum age
        gb.foreach(r->{
        	String newAgeStr = r.getHashVal().get("age");
        	int newAge = Integer.parseInt(newAgeStr);
        	
        	// Get the current maximum age
        	String maxAgeKey = "age:maximum";
        	String maxAgeStr = (String) GearsBuilder.execute("GET", maxAgeKey);
        	
        	int maxAge = 0; // Initialize to 0
        	if (maxAgeStr != null) {
                // Convert the maximum age to an integer
        		maxAge = Integer.parseInt(maxAgeStr);
        	}

        	// Update the maximum age if a new age is higher
        	if (newAge > maxAge) {               
        		GearsBuilder.execute("SET", maxAgeKey, newAgeStr); 
        	}
        });
        
        // Store this pipeline of functions and 
        // run them when a new person key is added
        gb.register(ExecutionMode.SYNC);
        // Note: ExecutionMode defaults to ASYNC 
        // if you call register() without any arguments
    }
}

示例事件处理

使用 RG.JEXECUTE 命令注册代码后,添加一些数据到数据库并检查 age:maximum 的值,以验证其是否正确运行。

  1. 使用 redis-cli 连接到数据库

    $ redis-cli -h <host> -p <port>
    
  2. 使用 HSET 添加一个表示个人的哈希

    127.0.0.1:12000> HSET person:1 name "Alex" age 24
    (integer) 2
    
  3. 当前的 age:maximum 值应该与 Alex 的年龄匹配

    127.0.0.1:12000> GET age:maximum
    "24"
    
  4. 添加另一个年龄更大的个人,然后检查 age:maximum 是否自动更新

    127.0.0.1:12000> HSET person:2 name "Morgan" age 45
    (integer) 2
    127.0.0.1:12000> GET age:maximum
    "45"
    
  5. 添加一个年龄更小的个人,并验证 age:maximum 是否没有改变

    127.0.0.1:12000> HSET person:3 name "Lee" age 31
    (integer) 2
    127.0.0.1:12000> GET age:maximum
    "45"
    
RATE THIS PAGE
Back to top ↑