RedisGears JVM 快速入门
学习如何使用 Java 与 RedisGears 交互的快速入门指南。
先决条件
此快速入门需要以下内容:
- 一个安装了 RedisGears 模块和 JVM 插件 并 在数据库上启用 的 Redis Enterprise 集群
redis-cli
,可连接到 Redis 数据库
教程
创建 Maven 项目
-
创建一个新的 Maven 项目。
-
将以下部分添加到 pom.xml 文件中
<repositories> <repository> <id>snapshots-repo</id> <url>https://oss.sonatype.org/content/repositories/snapshots</url> </repository> </repositories> <dependencies> <dependency> <groupId>com.redislabs</groupId> <artifactId>gear_runtime</artifactId> <version>0.0.3-SNAPSHOT</version> </dependency> </dependencies>
构建 JAR
使用 Maven 命令行工具或 IDE 插件将代码编译并打包成 JAR 文件。
$ mvn package
上传 JAR
将 JAR 文件上传到 Redis Enterprise 集群中的一个节点。在运行代码时,需要使用目标文件路径。
运行 RedisGears Java 代码
使用 RG.JEXECUTE
命令运行代码。
$ redis-cli -x -h {host} -p {port} RG.JEXECUTE {package.MainClass} < {filepath}/{JAR name}.jar
GearsBuilder.run()
时,RG.JEXECUTE
会立即运行代码。但是,如果使用
GearsBuilder.register()
,RG.JEXECUTE
仅在成功注册时输出 OK
消息。已注册的代码将在某些数据库事件发生时运行。示例代码
您可以使用这些代码示例和自己的 Maven 项目来尝试使用 RedisGears JVM 插件进行批处理或事件处理。
批处理
如果在代码中使用 GearsBuilder.run()
函数,则在使用 RG.JEXECUTE
运行 JAR 文件时,添加到管道的函数将只执行一次。
以下示例计算存储在数据库中的所有餐厅评论的平均评分。
将数据添加到数据库
-
使用
redis-cli
连接到数据库$ redis-cli -h <host> -p <port>
-
使用
HSET
命令将一些评论哈希添加到数据库127.0.0.1:12000> HSET review:1 user "Alex L" message "My new favorite restaurant!" rating 5 (integer) 3 127.0.0.1:12000> HSET review:2 user "Anonymous user" message "Kind of overpriced" rating 2 (integer) 3 127.0.0.1:12000> HSET review:3 user "Francis J" message "They have a really unique menu." rating 4 (integer) 3 127.0.0.1:12000> exit
示例代码
import java.io.Serializable;
import gears.GearsBuilder;
import gears.readers.KeysReader;
import gears.records.KeysReaderRecord;
public class Reviews implements Serializable
{
private static final long serialVersionUID = 1L;
int count; // Total number of reviews
int ratingsSum; // Sum of all review ratings
// Reviews constructor
public Reviews(int count, int ratingsSum) {
this.count = count;
this.ratingsSum = ratingsSum;
}
public static void main(String args[])
{
// Create the reader that will pass data to the pipe
KeysReader reader = new KeysReader();
// Create the data pipe builder
GearsBuilder<KeysReaderRecord> gb = GearsBuilder.CreateGearsBuilder(reader);
gb.filter(r->{
// Filter out any keys that are not reviews
return r.getKey().startsWith("review:");
}).map(r->{
// Extract the rating field
return r.getHashVal().get("rating");
})
.accumulate(new Reviews(0, 0), (accumulator, record)-> {
// Count the reviews and add up all of their ratings
accumulator.count++;
accumulator.ratingsSum += Integer.parseInt(record);
return accumulator;
}).map(r->{
// Calculate the average rating
return Double.valueOf(((double) r.ratingsSum) / r.count);
});
// Run the data through the pipeline immediately
gb.run();
}
}
示例输出
$ redis-cli -x -h {host} -p {port} \
RG.JEXECUTE com.domain.packagename.Reviews < /tmp/rgjvmtest-0.0.1-SNAPSHOT.jar
1) 1) "3.6666666666666665"
2) (empty array)
事件处理
如果在代码中使用 GearsBuilder.register()
函数,则在每次发生特定数据库事件时,添加到管道的函数都会运行。
以下示例注册一个函数管道,以便在每次将新的个人哈希添加到数据库时自动更新最大年龄。
示例代码
import gears.GearsBuilder;
import gears.readers.KeysReader;
import gears.records.KeysReaderRecord;
public class App
{
public static void main(String args[])
{
// Create the reader that will pass data to the pipe
KeysReader reader = new KeysReader();
// Create the data pipe builder
GearsBuilder<KeysReaderRecord> gb = GearsBuilder.CreateGearsBuilder(reader);
// Only process keys that start with "person:"
gb.filter(r->{
return r.getKey().startsWith("person:");
});
// Compare each person's age to the current maximum age
gb.foreach(r->{
String newAgeStr = r.getHashVal().get("age");
int newAge = Integer.parseInt(newAgeStr);
// Get the current maximum age
String maxAgeKey = "age:maximum";
String maxAgeStr = (String) GearsBuilder.execute("GET", maxAgeKey);
int maxAge = 0; // Initialize to 0
if (maxAgeStr != null) {
// Convert the maximum age to an integer
maxAge = Integer.parseInt(maxAgeStr);
}
// Update the maximum age if a new age is higher
if (newAge > maxAge) {
GearsBuilder.execute("SET", maxAgeKey, newAgeStr);
}
});
// Store this pipeline of functions and
// run them when a new person key is added
gb.register(ExecutionMode.SYNC);
// Note: ExecutionMode defaults to ASYNC
// if you call register() without any arguments
}
}
示例事件处理
使用 RG.JEXECUTE
命令注册代码后,添加一些数据到数据库并检查 age:maximum
的值,以验证其是否正确运行。
-
使用
redis-cli
连接到数据库$ redis-cli -h <host> -p <port>
-
使用
HSET
添加一个表示个人的哈希127.0.0.1:12000> HSET person:1 name "Alex" age 24 (integer) 2
-
当前的
age:maximum
值应该与 Alex 的年龄匹配127.0.0.1:12000> GET age:maximum "24"
-
添加另一个年龄更大的个人,然后检查
age:maximum
是否自动更新127.0.0.1:12000> HSET person:2 name "Morgan" age 45 (integer) 2 127.0.0.1:12000> GET age:maximum "45"
-
添加一个年龄更小的个人,并验证
age:maximum
是否没有改变127.0.0.1:12000> HSET person:3 name "Lee" age 31 (integer) 2 127.0.0.1:12000> GET age:maximum "45"