在单机环境下实现字符串追加函数(Pulsar 2.4.2版本)
1 启动单机Pulsar
$ bin/pulsar-daemon start standalone
2 创建函数
1) 准备环境
项目引用 compile 'org.apache.pulsar:pulsar-functions-api:2.4.2'
2) 创建JAVA函数(此函数用于数据源来的topic schema是string,输出的tiopic schema是string)
导出jar包,放到pulsar服务器目录下,本例子放在 /data/jar/下
3)使用命令行工具加载函数到Pulsar,
bin/pulsar-admin functions create \
--classname test.AppStrFunction \
--jar /data/jar/pf.jar \
--inputs persistent://public/default/tlstest \
--output persistent://public/default/teststr \
--tenant public \
--namespace default \
--name appStrFunction
参数说明:
参数 | 说明 |
functions | 通知 pulsar broker,函数操作 |
create | 创建函数,默认创建成功后启动 |
classname | 函数类名称,需要加上包名 |
jar | 指定 jar 包的运行路径 |
inputs | 指定 函数 数据的来源在哪里,支持多个 topics 作为输入 |
output | 如果该 函数 有输出(有些情况下,function 没有输出),指定 function 输出的 topic,只能有一个输出 |
tenant | 指定该 函数 运行的租户名 |
namespace | 指定该 函数 运行的命名空间 |
name | 指定该 函数 运行的名称 |
停止函数
bin/pulsar-admin functions stop \
--tenant public \
--namespace default \
--name appStrFunction
启动函数
bin/pulsar-admin functions start \
--tenant public \
--namespace default \
--name appStrFunction
删除函数
bin/pulsar-admin functions delete \
--tenant public \
--namespace default \
--name appStrFunction
函数的日志在 pulsar安装目录 /logs/functions下
3 测试函数
根据前边函数已成功加载启动
1)向tlstest主题发送消息
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
public class SendMsgTest{
public static void main(String[] args){
String url="pulsar://192.168.1.48:6650";
try{
PulsarClient client =PulsarClient.builder()
.serviceUrl(url)
.connectionTimeout(10,TimeUnit.SECONDS)
.build();
Producer<String> producer=client.newProducer(Schema.STRING)
.topic("tlstest")
.sendTimeout(10,TimeUnit.SECONDS)
.producerName("senduser")
.create();
producer.send("this is a book");
System.out.print("send ok");
client.close();
}catch(Exception e){
e.printStackTrace();
}
}
}
2)读取teststr主题消息
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import schema.OrderModel;
import com.alibaba.fastjson.JSON;
public class RecFunTest {
public static void main(String[] args) {
String url = "http://192.168.1.48:8080";
try{
PulsarClient client =PulsarClient.builder()
.serviceUrl(url)
.build();
Consumer<String> consumer=client.newConsumer(Schema.STRING)
.topic("teststr")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscriptionType(SubscriptionType.Exclusive)//订阅模式 Exclusive(独占,默认模式) Failover(灾备)Shared(共享)
.subscriptionName("wbq")//订阅者名称
.subscribe();
while (true) {
Message<String> mondmsg = consumer.receive();
String msg=mondmsg.getValue();
System.out.println("receive message=:"+msg);
}
}catch(Exception e){
e.printStackTrace();
}
}
}
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。