本篇文章为大家展示了Apache Atlas是如何构建自己的API,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。
Apache Atlas是一个优秀的服务治理组件,用于企业Hadoop集群上的数据治理和元数据管理的数据治理工具。接下来我们将讨论构建自己的Java API,这些Java API可使用Apache atlas客户端与Apache Atlas交互以在其中创建新的实体和类型。
以下依赖项可用于pom.xml文件
<dependency>
<groupId>org.apache.atlas</groupId>
<artifactId>atlas-client</artifactId>
<version>0.7-incubating</version>
</dependency>
<dependency>
<groupId>org.apache.atlas</groupId>
<artifactId>atlas-typesystem</artifactId>
<version>0.7-incubating</version>
</dependency>
<dependency>
<groupId>org.apache.atlas</groupId>
<artifactId>atlas-notification</artifactId>
<version>0.7-incubating</version>
</dependency>
<dependency>
<groupId>org.apache.atlas</groupId>
<artifactId>atlas-repository</artifactId>
<version>0.7-incubating</version>
</dependency>
Apache Atlas客户端使用atlas-application属性在我们的API和Apache Atlas服务器之间建立连接。这些属性应放置在resources/atlas-application.properties中
######### Security Properties #########
# SSL config
atlas.enableTLS=false
######### Server Properties #########
atlas.rest.address=http://192.168.5.95:21000
atlas.hook.demo.kafka.retries=1
atlas.kafka.zookeeper.connect=192.168.5.93:2181,192.168.5.94:2181,192.168.5.95:2181
atlas.kafka.bootstrap.servers=192.168.5.93:9092,192.168.5.94:9092,192.168.5.95:9092
atlas.kafka.zookeeper.session.timeout.ms=4000
atlas.kafka.zookeeper.connection.timeout.ms=2000
atlas.kafka.zookeeper.sync.time.ms=20
atlas.kafka.auto.commit.interval.ms=1000
atlas.kafka.hook.group.id=atlas
要与Apache atlas Server,baseUrl和用户名创建连接,必须在AtlasClient构造函数中传递密码
final AtlasClient atlasClient = new AtlasClient
(new String[]{"http://192.168.5.95:21000"},
new String[]{"admin",
"admin"});
public class AtlasTypesTest {
final AtlasClient atlasClient = new AtlasClient
(new String[]{"http://192.168.5.95:21000"},
new String[]{"admin",
"admin"});
static final String DATABASE_TYPE = "DB_Sync";
static final String COLUMN_TYPE = "Column_Sync";
static final String TABLE_TYPE = "Table_Sync";
static final String VIEW_TYPE = "View_Sync";
public static final String DB_ATTRIBUTE = "db";
static final String STORAGE_DESC_TYPE = "StorageDesc";
public static final String COLUMNS_ATTRIBUTE = "columns";
public static final String INPUT_TABLES_ATTRIBUTE = "inputTables";
private static final String[] TYPES =
{DATABASE_TYPE, TABLE_TYPE, STORAGE_DESC_TYPE, COLUMN_TYPE, VIEW_TYPE, "JdbcAccess",
"ETL", "Metric", "PII", "Fact", "Dimension", "Log Data"};
/**
* 组织定义types
* @return
*/
TypesDef createTypeDefinitions() {
HierarchicalTypeDefinition<ClassType> dbClsDef = TypesUtil
.createClassTypeDef(DATABASE_TYPE, DATABASE_TYPE, null,
TypesUtil.createUniqueRequiredAttrDef("name", DataTypes.STRING_TYPE),
attrDef("description", DataTypes.STRING_TYPE.getName()), attrDef("locationUri", DataTypes.STRING_TYPE.getName()),
attrDef("owner", DataTypes.STRING_TYPE.getName()), attrDef("createTime", DataTypes.LONG_TYPE.getName()));
HierarchicalTypeDefinition<ClassType> columnClsDef = TypesUtil
.createClassTypeDef(COLUMN_TYPE, COLUMN_TYPE, null, attrDef("name", DataTypes.STRING_TYPE.getName()),
attrDef("dataType", DataTypes.STRING_TYPE.getName()), attrDef("comment", DataTypes.STRING_TYPE.getName()));
HierarchicalTypeDefinition<ClassType> tblClsDef = TypesUtil
.createClassTypeDef(TABLE_TYPE, TABLE_TYPE, ImmutableSet.of("DataSet"),
new AttributeDefinition(DB_ATTRIBUTE, DATABASE_TYPE, Multiplicity.REQUIRED, false, null),
new AttributeDefinition("sd", STORAGE_DESC_TYPE, Multiplicity.REQUIRED, true, null),
attrDef("owner", DataTypes.STRING_TYPE.getName()), attrDef("createTime", DataTypes.LONG_TYPE.getName()),
attrDef("lastAccessTime", DataTypes.LONG_TYPE.getName()), attrDef("retention", DataTypes.LONG_TYPE.getName()),
attrDef("viewOriginalText", DataTypes.STRING_TYPE.getName()),
attrDef("viewExpandedText", DataTypes.STRING_TYPE.getName()), attrDef("tableType", DataTypes.STRING_TYPE.getName()),
attrDef("temporary", DataTypes.BOOLEAN_TYPE.getName()),
new AttributeDefinition(COLUMNS_ATTRIBUTE, DataTypes.arrayTypeName(COLUMN_TYPE),
Multiplicity.COLLECTION, true, null));
HierarchicalTypeDefinition<ClassType> viewClsDef = TypesUtil
.createClassTypeDef(VIEW_TYPE, VIEW_TYPE, ImmutableSet.of("DataSet"),
new AttributeDefinition("db", DATABASE_TYPE, Multiplicity.REQUIRED, false, null),
new AttributeDefinition("inputTables", DataTypes.arrayTypeName(TABLE_TYPE),
Multiplicity.COLLECTION, false, null));
return TypesUtil.getTypesDef(ImmutableList.<EnumTypeDefinition>of(), ImmutableList.<StructTypeDefinition>of(),
ImmutableList.of(),
ImmutableList.of(dbClsDef, columnClsDef, tblClsDef, viewClsDef));
}
private void createTypes() throws Exception {
TypesDef typesDef = createTypeDefinitions();
String typesAsJSON = TypesSerialization.toJson(typesDef);
System.out.println("typesAsJSON = " + typesAsJSON);
atlasClient.createType(typesAsJSON);
verifyTypesCreated();
}
private void verifyTypesCreated() throws Exception {
List<String> types = atlasClient.listTypes();
for (String type : TYPES) {
assert types.contains(type);
}
}
AttributeDefinition attrDef(String name, String dT) {
return attrDef(name, dT, Multiplicity.OPTIONAL, false, null);
}
AttributeDefinition attrDef(String name, String dT, Multiplicity m, boolean isComposite,
String reverseAttributeName) {
return new AttributeDefinition(name, dT, m, isComposite, reverseAttributeName);
}
@Test
public void createNewTypes() throws Exception {
createTypes();
}
}
public class AtlasEntitiesTest {
final AtlasClient atlasClient = new AtlasClient
(new String[]{"http://192.168.5.95:21000"},
new String[]{"admin",
"admin"});
/**
* 创建实例并返创建的Id对象
* @param referenceable
* @return
* @throws Exception
*/
private Id createInstance(Referenceable referenceable) throws Exception {
String typeName = referenceable.getTypeName();
String entityJSON = InstanceSerialization.toJson(referenceable, true);
System.out.println("Submitting new entity= " + entityJSON);
List<String> guids = atlasClient.createEntity(entityJSON);
System.out.println("created instance for type " + typeName + ", guid: " + guids);
return new Id(guids.get(guids.size() - 1), referenceable.getId().getVersion(),
referenceable.getTypeName());
}
/**
* 创建数据库实例并返创建的数据库Id对象
* @param name
* @param description
* @param owner
* @param locationUri
* @param traitNames
* @return
* @throws Exception
*/
Id database(String name, String description, String owner, String locationUri, String... traitNames)
throws Exception {
Referenceable referenceable = new Referenceable(DATABASE_TYPE, traitNames);
referenceable.set("name", name);
referenceable.set("description", description);
referenceable.set("owner", owner);
referenceable.set("locationUri", locationUri);
referenceable.set("createTime", System.currentTimeMillis());
return createInstance(referenceable);
}
/**
* 创建列的实例并返创建的列的实例对象
* @param name
* @param dataType
* @param comment
* @param traitNames
* @return
* @throws Exception
*/
Referenceable column(String name, String dataType, String comment, String... traitNames) throws Exception {
Referenceable referenceable = new Referenceable(COLUMN_TYPE, traitNames);
referenceable.set("name", name);
referenceable.set("dataType", dataType);
referenceable.set("comment", comment);
return referenceable;
}
/**
* 创建表的实例并返创建的表的Id对象
* @param name
* @param description
* @param dbId
* @param sd
* @param owner
* @param tableType
* @param columns
* @param traitNames
* @return
* @throws Exception
*/
Id table(String name, String description, Id dbId, Referenceable sd, String owner, String tableType,
List<Referenceable> columns, String... traitNames) throws Exception {
Referenceable referenceable = new Referenceable(TABLE_TYPE, traitNames);
referenceable.set("name", name);
referenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, name);
referenceable.set("description", description);
referenceable.set("owner", owner);
referenceable.set("tableType", tableType);
referenceable.set("createTime", System.currentTimeMillis());
referenceable.set("lastAccessTime", System.currentTimeMillis());
referenceable.set("retention", System.currentTimeMillis());
referenceable.set("db", dbId);
referenceable.set("sd", sd);
referenceable.set("columns", columns);
return createInstance(referenceable);
}
/**
* 创建视图的实例并返创建的视图的Id对象
* @param name
* @param dbId
* @param inputTables
* @param traitNames
* @return
* @throws Exception
*/
Id view(String name, Id dbId, List<Id> inputTables, String... traitNames) throws Exception {
Referenceable referenceable = new Referenceable(VIEW_TYPE, traitNames);
referenceable.set("name", name);
referenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, name);
referenceable.set("db", dbId);
referenceable.set(INPUT_TABLES_ATTRIBUTE, inputTables);
return createInstance(referenceable);
}
/**
* 原始存储描述符
* @param location
* @param inputFormat
* @param outputFormat
* @param compressed
* @return
* @throws Exception
*/
Referenceable storageDescriptor(String location, String inputFormat, String outputFormat, boolean compressed)
throws Exception {
Referenceable referenceable = new Referenceable(STORAGE_DESC_TYPE);
referenceable.set("location", location);
referenceable.set("inputFormat", inputFormat);
referenceable.set("outputFormat", outputFormat);
referenceable.set("compressed", compressed);
return referenceable;
}
@Test
public void createEntities() throws Exception {
//创建数据库实例
Id syncDB = database("sy_sync", "Sync Database", "root", "");
//存储描述符
Referenceable sd =
storageDescriptor("", "TextInputFormat", "TextOutputFormat",
true);
//创建列实例
//1、数据源
List<Referenceable> databaseColumns = ImmutableList
.of(column("id", "long", "id"),
column("name", "string", "name"),
column("type", "string", "type"),
column("url", "string", "url"),
column("database_name", "string", "database name"),
column("username", "string", "username"),
column("password","string","password"),
column("description", "string", "description"),
column("create_time", "string", "create time"),
column("update_time", "string", "update time"),
column("create_id", "long", "user id"),
column("update_id", "long", "user id"));
//2、同步文件夹
List<Referenceable> syncFolderColumns = ImmutableList
.of(column("id", "long", "id"),
column("name", "string", "name"),
column("description", "string", "description"),
column("create_time", "string", "create time"),
column("update_time", "string", "update time"),
column("create_id", "long", "user id"),
column("update_id", "long", "user id"));
//创建表实例
Id database = table("datasource", "database table", syncDB, sd, "root", "External", databaseColumns);
Id syncFolder = table("folder", "sync folder table", syncDB, sd, "root", "External", syncFolderColumns);
//创建视图实例
}
@Test
public void getEntity() throws AtlasServiceException {
Referenceable referenceable = atlasClient.getEntity("1406ddd0-5d51-41d4-b174-859bd4f34a5b");
System.out.println(InstanceSerialization.toJson(referenceable, true));
}
}
上述内容就是Apache Atlas是如何构建自己的API,你们学到知识或技能了吗?如果还想学到更多技能或者丰富自己的知识储备,欢迎关注亿速云行业资讯频道。
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。
原文链接:https://my.oschina.net/u/145473/blog/4527830