本篇文章为大家展示了hbase0.98.9中如何实现endpoints,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。
定制一个endpoint的过程。
下面是实现过程:
1、定义接口描述文件(该功能有protobuf提供出来)
option java_package = "coprocessor.endpoints.generated";
option java_outer_classname = "RowCounterEndpointProtos";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;
message CountRequest {
}
message CountResponse {
required int64 count = 1 [default = 0];
}
service RowCountService {
rpc getRowCount(CountRequest)
returns (CountResponse);
rpc getKeyValueCount(CountRequest)
returns (CountResponse);
}
这个文件我直接拿的hbase提供的example中的例子。其中的语法应该有过类似经验的一看就清楚了,实在不清楚就请查查protobuf的帮助手册吧。
2、根据接口描述文件生成java接口类(该功能有protobuf提供出来)
有了接口描述文件,还需要生成java语言的接口类。这个需要借助protobuf提供的工具protoc。
$protoc --java_out=./ Examples.proto
简单解释下,protoc这个命令在你装了protobuf后就有了。Examples.proto这个是文件名,也就是刚才编写的那个接口描述文件。“--java_out”这个用来指定生成后的java类放的地方。
所以,这地方如果你没有装protobuf,你需要装一个,window和linux版都有,多说一句,如果你去装hadoop64位的编译环境的话,应该是要装protobuf。
3、实现接口
package coprocessor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.util.Bytes;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
import coprocessor.endpoints.generated.RowCounterEndpointProtos.CountRequest;
import coprocessor.endpoints.generated.RowCounterEndpointProtos.CountResponse;
import coprocessor.endpoints.generated.RowCounterEndpointProtos.RowCountService;
public class RowCounterEndpointExample extends RowCountService implements
Coprocessor, CoprocessorService {
private RegionCoprocessorEnvironment env;
public RowCounterEndpointExample() {
}
@Override
public Service getService() {
return this;
}
@Override
public void getRowCount(RpcController controller, CountRequest request,
RpcCallback<CountResponse> done) {
Scan scan = new Scan();
scan.setFilter(new FirstKeyOnlyFilter());
CountResponse response = null;
InternalScanner scanner = null;
try {
scanner = env.getRegion().getScanner(scan);
List<Cell> results = new ArrayList<Cell>();
boolean hasMore = false;
byte[] lastRow = null;
long count = 0;
do {
hasMore = scanner.next(results);
for (Cell kv : results) {
byte[] currentRow = CellUtil.cloneRow(kv);
if (lastRow == null || !Bytes.equals(lastRow, currentRow)) {
lastRow = currentRow;
count++;
}
}
results.clear();
} while (hasMore);
response = CountResponse.newBuilder().setCount(count).build();
} catch (IOException ioe) {
ResponseConverter.setControllerException(controller, ioe);
} finally {
if (scanner != null) {
try {
scanner.close();
} catch (IOException ignored) {
}
}
}
done.run(response);
}
@Override
public void getKeyValueCount(RpcController controller,
CountRequest request, RpcCallback<CountResponse> done) {
CountResponse response = null;
InternalScanner scanner = null;
try {
scanner = env.getRegion().getScanner(new Scan());
List<Cell> results = new ArrayList<Cell>();
boolean hasMore = false;
long count = 0;
do {
hasMore = scanner.next(results);
for (Cell kv : results) {
count++;
}
results.clear();
} while (hasMore);
response = CountResponse.newBuilder().setCount(count).build();
} catch (IOException ioe) {
ResponseConverter.setControllerException(controller, ioe);
} finally {
if (scanner != null) {
try {
scanner.close();
} catch (IOException ignored) {
}
}
}
done.run(response);
}
@Override
public void start(CoprocessorEnvironment env) throws IOException {
if (env instanceof RegionCoprocessorEnvironment) {
this.env = (RegionCoprocessorEnvironment) env;
} else {
throw new CoprocessorException("Must be loaded on a table region!");
}
}
@Override
public void stop(CoprocessorEnvironment env) throws IOException {
// TODO Auto-generated method stub
}
}
4、注册接口(Hbase功能,通过配置文件或者表模式方式注册)
这部分,可以看hbase权威指南了,我就看这部分做的。
5、测试调用
package coprocessor;
import java.io.IOException;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.util.Bytes;
import com.google.protobuf.ServiceException;
import coprocessor.endpoints.generated.RowCounterEndpointProtos.CountRequest;
import coprocessor.endpoints.generated.RowCounterEndpointProtos.CountResponse;
import coprocessor.endpoints.generated.RowCounterEndpointProtos.RowCountService;
import util.HBaseHelper;
public class RowCounterEndpointClientExample {
public static void main(String[] args) throws ServiceException, Throwable {
Configuration conf = HBaseConfiguration.create();
HBaseHelper helper = HBaseHelper.getHelper(conf);
//helper.dropTable("testtable");
//helper.createTable("testtable", "colfam1", "colfam2");
System.out.println("Adding rows to table...");
helper.fillTable("testtable", 1, 10, 10, "colfam1", "colfam2");
HTable table = new HTable(conf, "testtable");
final CountRequest request = CountRequest.getDefaultInstance();
final Batch.Call<RowCountService, Long> call =new Batch.Call<RowCountService, Long>() {
public Long call(RowCountService counter)
throws IOException {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<CountResponse> rpcCallback = new BlockingRpcCallback<CountResponse>();
counter.getRowCount(controller, request, rpcCallback);
CountResponse response = rpcCallback.get();
if (controller.failedOnException()) {
throw controller.getFailedOn();
}
return (response != null && response.hasCount()) ? response
.getCount() : 0;
}
};
Map<byte[], Long> results = table.coprocessorService(
RowCountService.class, null, null, call);
for(byte[] b : results.keySet()){
System.err.println(Bytes.toString(b) + ":" + results.get(b));
}
}
}
上述内容就是hbase0.98.9中如何实现endpoints,你们学到知识或技能了吗?如果还想学到更多技能或者丰富自己的知识储备,欢迎关注亿速云行业资讯频道。
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。
原文链接:https://my.oschina.net/psuyun/blog/363634