在单机环境下实现字符串追加函数(Pulsar 2.4.2版本)
1 启动单机Pulsar
$ bin/pulsar-daemon start standalone
2 创建函数
1) 准备环境
项目引用 compile 'org.apache.pulsar:pulsar-functions-api:2.4.2'
2) 创建JAVA函数(此函数用于数据源来的topic schema是string,输出的tiopic schema是string)
导出jar包,放到pulsar服务器目录下,本例子放在 /data/jar/下
3)使用命令行工具加载函数到Pulsar,
bin/pulsar-admin functions create \
--classname test.AppStrFunction \
--jar /data/jar/pf.jar \
--inputs persistent://public/default/tlstest \
--output persistent://public/default/teststr \
--tenant public \
--namespace default \
--name appStrFunction
参数说明:
参数 | 说明 |
functions | 通知 pulsar broker,函数操作 |
create | 创建函数,默认创建成功后启动 |
classname | 函数类名称,需要加上包名 |
jar | 指定 jar 包的运行路径 |
inputs | 指定 函数 数据的来源在哪里,支持多个 topics 作为输入 |
output | 如果该 函数 有输出(有些情况下,function 没有输出),指定 function 输出的 topic,只能有一个输出 |
tenant | 指定该 函数 运行的租户名 |
namespace | 指定该 函数 运行的命名空间 |
name | 指定该 函数 运行的名称 |
停止函数
bin/pulsar-admin functions stop \
--tenant public \
--namespace default \
--name appStrFunction
启动函数
bin/pulsar-admin functions start \
--tenant public \
--namespace default \
--name appStrFunction
删除函数
bin/pulsar-admin functions delete \
--tenant public \
--namespace default \
--name appStrFunction
函数的日志在 pulsar安装目录 /logs/functions下
3 测试函数
根据前边函数已成功加载启动
1)向tlstest主题发送消息
import java.util.concurrent.TimeUnit; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; public class SendMsgTest{ public static void main(String[] args){ String url="pulsar://192.168.1.48:6650"; try{ PulsarClient client =PulsarClient.builder() .serviceUrl(url) .connectionTimeout(10,TimeUnit.SECONDS) .build(); Producer<String> producer=client.newProducer(Schema.STRING) .topic("tlstest") .sendTimeout(10,TimeUnit.SECONDS) .producerName("senduser") .create(); producer.send("this is a book"); System.out.print("send ok"); client.close(); }catch(Exception e){ e.printStackTrace(); } } }
2)读取teststr主题消息
import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.schema.JSONSchema; import schema.OrderModel; import com.alibaba.fastjson.JSON; public class RecFunTest { public static void main(String[] args) { String url = "http://192.168.1.48:8080"; try{ PulsarClient client =PulsarClient.builder() .serviceUrl(url) .build(); Consumer<String> consumer=client.newConsumer(Schema.STRING) .topic("teststr") .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) .subscriptionType(SubscriptionType.Exclusive)//订阅模式 Exclusive(独占,默认模式) Failover(灾备)Shared(共享) .subscriptionName("wbq")//订阅者名称 .subscribe(); while (true) { Message<String> mondmsg = consumer.receive(); String msg=mondmsg.getValue(); System.out.println("receive message=:"+msg); } }catch(Exception e){ e.printStackTrace(); } } }
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。