本篇文章为大家展示了Apache Atlas是如何构建自己的API,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。
Apache Atlas是一个优秀的服务治理组件,用于企业Hadoop集群上的数据治理和元数据管理的数据治理工具。接下来我们将讨论构建自己的Java API,这些Java API可使用Apache atlas客户端与Apache Atlas交互以在其中创建新的实体和类型。
以下依赖项可用于pom.xml文件
<dependency> <groupId>org.apache.atlas</groupId> <artifactId>atlas-client</artifactId> <version>0.7-incubating</version> </dependency> <dependency> <groupId>org.apache.atlas</groupId> <artifactId>atlas-typesystem</artifactId> <version>0.7-incubating</version> </dependency> <dependency> <groupId>org.apache.atlas</groupId> <artifactId>atlas-notification</artifactId> <version>0.7-incubating</version> </dependency> <dependency> <groupId>org.apache.atlas</groupId> <artifactId>atlas-repository</artifactId> <version>0.7-incubating</version> </dependency>
Apache Atlas客户端使用atlas-application属性在我们的API和Apache Atlas服务器之间建立连接。这些属性应放置在resources/atlas-application.properties中
######### Security Properties ######### # SSL config atlas.enableTLS=false ######### Server Properties ######### atlas.rest.address=http://192.168.5.95:21000 atlas.hook.demo.kafka.retries=1 atlas.kafka.zookeeper.connect=192.168.5.93:2181,192.168.5.94:2181,192.168.5.95:2181 atlas.kafka.bootstrap.servers=192.168.5.93:9092,192.168.5.94:9092,192.168.5.95:9092 atlas.kafka.zookeeper.session.timeout.ms=4000 atlas.kafka.zookeeper.connection.timeout.ms=2000 atlas.kafka.zookeeper.sync.time.ms=20 atlas.kafka.auto.commit.interval.ms=1000 atlas.kafka.hook.group.id=atlas
要与Apache atlas Server,baseUrl和用户名创建连接,必须在AtlasClient构造函数中传递密码
final AtlasClient atlasClient = new AtlasClient (new String[]{"http://192.168.5.95:21000"}, new String[]{"admin", "admin"});
public class AtlasTypesTest { final AtlasClient atlasClient = new AtlasClient (new String[]{"http://192.168.5.95:21000"}, new String[]{"admin", "admin"}); static final String DATABASE_TYPE = "DB_Sync"; static final String COLUMN_TYPE = "Column_Sync"; static final String TABLE_TYPE = "Table_Sync"; static final String VIEW_TYPE = "View_Sync"; public static final String DB_ATTRIBUTE = "db"; static final String STORAGE_DESC_TYPE = "StorageDesc"; public static final String COLUMNS_ATTRIBUTE = "columns"; public static final String INPUT_TABLES_ATTRIBUTE = "inputTables"; private static final String[] TYPES = {DATABASE_TYPE, TABLE_TYPE, STORAGE_DESC_TYPE, COLUMN_TYPE, VIEW_TYPE, "JdbcAccess", "ETL", "Metric", "PII", "Fact", "Dimension", "Log Data"}; /** * 组织定义types * @return */ TypesDef createTypeDefinitions() { HierarchicalTypeDefinition<ClassType> dbClsDef = TypesUtil .createClassTypeDef(DATABASE_TYPE, DATABASE_TYPE, null, TypesUtil.createUniqueRequiredAttrDef("name", DataTypes.STRING_TYPE), attrDef("description", DataTypes.STRING_TYPE.getName()), attrDef("locationUri", DataTypes.STRING_TYPE.getName()), attrDef("owner", DataTypes.STRING_TYPE.getName()), attrDef("createTime", DataTypes.LONG_TYPE.getName())); HierarchicalTypeDefinition<ClassType> columnClsDef = TypesUtil .createClassTypeDef(COLUMN_TYPE, COLUMN_TYPE, null, attrDef("name", DataTypes.STRING_TYPE.getName()), attrDef("dataType", DataTypes.STRING_TYPE.getName()), attrDef("comment", DataTypes.STRING_TYPE.getName())); HierarchicalTypeDefinition<ClassType> tblClsDef = TypesUtil .createClassTypeDef(TABLE_TYPE, TABLE_TYPE, ImmutableSet.of("DataSet"), new AttributeDefinition(DB_ATTRIBUTE, DATABASE_TYPE, Multiplicity.REQUIRED, false, null), new AttributeDefinition("sd", STORAGE_DESC_TYPE, Multiplicity.REQUIRED, true, null), attrDef("owner", DataTypes.STRING_TYPE.getName()), attrDef("createTime", DataTypes.LONG_TYPE.getName()), attrDef("lastAccessTime", DataTypes.LONG_TYPE.getName()), attrDef("retention", DataTypes.LONG_TYPE.getName()), attrDef("viewOriginalText", DataTypes.STRING_TYPE.getName()), attrDef("viewExpandedText", DataTypes.STRING_TYPE.getName()), attrDef("tableType", DataTypes.STRING_TYPE.getName()), attrDef("temporary", DataTypes.BOOLEAN_TYPE.getName()), new AttributeDefinition(COLUMNS_ATTRIBUTE, DataTypes.arrayTypeName(COLUMN_TYPE), Multiplicity.COLLECTION, true, null)); HierarchicalTypeDefinition<ClassType> viewClsDef = TypesUtil .createClassTypeDef(VIEW_TYPE, VIEW_TYPE, ImmutableSet.of("DataSet"), new AttributeDefinition("db", DATABASE_TYPE, Multiplicity.REQUIRED, false, null), new AttributeDefinition("inputTables", DataTypes.arrayTypeName(TABLE_TYPE), Multiplicity.COLLECTION, false, null)); return TypesUtil.getTypesDef(ImmutableList.<EnumTypeDefinition>of(), ImmutableList.<StructTypeDefinition>of(), ImmutableList.of(), ImmutableList.of(dbClsDef, columnClsDef, tblClsDef, viewClsDef)); } private void createTypes() throws Exception { TypesDef typesDef = createTypeDefinitions(); String typesAsJSON = TypesSerialization.toJson(typesDef); System.out.println("typesAsJSON = " + typesAsJSON); atlasClient.createType(typesAsJSON); verifyTypesCreated(); } private void verifyTypesCreated() throws Exception { List<String> types = atlasClient.listTypes(); for (String type : TYPES) { assert types.contains(type); } } AttributeDefinition attrDef(String name, String dT) { return attrDef(name, dT, Multiplicity.OPTIONAL, false, null); } AttributeDefinition attrDef(String name, String dT, Multiplicity m, boolean isComposite, String reverseAttributeName) { return new AttributeDefinition(name, dT, m, isComposite, reverseAttributeName); } @Test public void createNewTypes() throws Exception { createTypes(); } }
public class AtlasEntitiesTest { final AtlasClient atlasClient = new AtlasClient (new String[]{"http://192.168.5.95:21000"}, new String[]{"admin", "admin"}); /** * 创建实例并返创建的Id对象 * @param referenceable * @return * @throws Exception */ private Id createInstance(Referenceable referenceable) throws Exception { String typeName = referenceable.getTypeName(); String entityJSON = InstanceSerialization.toJson(referenceable, true); System.out.println("Submitting new entity= " + entityJSON); List<String> guids = atlasClient.createEntity(entityJSON); System.out.println("created instance for type " + typeName + ", guid: " + guids); return new Id(guids.get(guids.size() - 1), referenceable.getId().getVersion(), referenceable.getTypeName()); } /** * 创建数据库实例并返创建的数据库Id对象 * @param name * @param description * @param owner * @param locationUri * @param traitNames * @return * @throws Exception */ Id database(String name, String description, String owner, String locationUri, String... traitNames) throws Exception { Referenceable referenceable = new Referenceable(DATABASE_TYPE, traitNames); referenceable.set("name", name); referenceable.set("description", description); referenceable.set("owner", owner); referenceable.set("locationUri", locationUri); referenceable.set("createTime", System.currentTimeMillis()); return createInstance(referenceable); } /** * 创建列的实例并返创建的列的实例对象 * @param name * @param dataType * @param comment * @param traitNames * @return * @throws Exception */ Referenceable column(String name, String dataType, String comment, String... traitNames) throws Exception { Referenceable referenceable = new Referenceable(COLUMN_TYPE, traitNames); referenceable.set("name", name); referenceable.set("dataType", dataType); referenceable.set("comment", comment); return referenceable; } /** * 创建表的实例并返创建的表的Id对象 * @param name * @param description * @param dbId * @param sd * @param owner * @param tableType * @param columns * @param traitNames * @return * @throws Exception */ Id table(String name, String description, Id dbId, Referenceable sd, String owner, String tableType, List<Referenceable> columns, String... traitNames) throws Exception { Referenceable referenceable = new Referenceable(TABLE_TYPE, traitNames); referenceable.set("name", name); referenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, name); referenceable.set("description", description); referenceable.set("owner", owner); referenceable.set("tableType", tableType); referenceable.set("createTime", System.currentTimeMillis()); referenceable.set("lastAccessTime", System.currentTimeMillis()); referenceable.set("retention", System.currentTimeMillis()); referenceable.set("db", dbId); referenceable.set("sd", sd); referenceable.set("columns", columns); return createInstance(referenceable); } /** * 创建视图的实例并返创建的视图的Id对象 * @param name * @param dbId * @param inputTables * @param traitNames * @return * @throws Exception */ Id view(String name, Id dbId, List<Id> inputTables, String... traitNames) throws Exception { Referenceable referenceable = new Referenceable(VIEW_TYPE, traitNames); referenceable.set("name", name); referenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, name); referenceable.set("db", dbId); referenceable.set(INPUT_TABLES_ATTRIBUTE, inputTables); return createInstance(referenceable); } /** * 原始存储描述符 * @param location * @param inputFormat * @param outputFormat * @param compressed * @return * @throws Exception */ Referenceable storageDescriptor(String location, String inputFormat, String outputFormat, boolean compressed) throws Exception { Referenceable referenceable = new Referenceable(STORAGE_DESC_TYPE); referenceable.set("location", location); referenceable.set("inputFormat", inputFormat); referenceable.set("outputFormat", outputFormat); referenceable.set("compressed", compressed); return referenceable; } @Test public void createEntities() throws Exception { //创建数据库实例 Id syncDB = database("sy_sync", "Sync Database", "root", ""); //存储描述符 Referenceable sd = storageDescriptor("", "TextInputFormat", "TextOutputFormat", true); //创建列实例 //1、数据源 List<Referenceable> databaseColumns = ImmutableList .of(column("id", "long", "id"), column("name", "string", "name"), column("type", "string", "type"), column("url", "string", "url"), column("database_name", "string", "database name"), column("username", "string", "username"), column("password","string","password"), column("description", "string", "description"), column("create_time", "string", "create time"), column("update_time", "string", "update time"), column("create_id", "long", "user id"), column("update_id", "long", "user id")); //2、同步文件夹 List<Referenceable> syncFolderColumns = ImmutableList .of(column("id", "long", "id"), column("name", "string", "name"), column("description", "string", "description"), column("create_time", "string", "create time"), column("update_time", "string", "update time"), column("create_id", "long", "user id"), column("update_id", "long", "user id")); //创建表实例 Id database = table("datasource", "database table", syncDB, sd, "root", "External", databaseColumns); Id syncFolder = table("folder", "sync folder table", syncDB, sd, "root", "External", syncFolderColumns); //创建视图实例 } @Test public void getEntity() throws AtlasServiceException { Referenceable referenceable = atlasClient.getEntity("1406ddd0-5d51-41d4-b174-859bd4f34a5b"); System.out.println(InstanceSerialization.toJson(referenceable, true)); } }
上述内容就是Apache Atlas是如何构建自己的API,你们学到知识或技能了吗?如果还想学到更多技能或者丰富自己的知识储备,欢迎关注亿速云行业资讯频道。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。