SLS日志服务的集成配置是怎样的,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。
<!-- 服务器--> <dependency> <groupId>com.aliyun.openservices</groupId> <artifactId>log-loghub-producer</artifactId> <version>0.1.4</version> <exclusions> <exclusion> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>com.aliyun.openservices</groupId> <artifactId>aliyun-log-producer</artifactId> <version>0.3.4</version> </dependency> <dependency> <groupId>com.aliyun.openservices</groupId> <artifactId>aliyun-log</artifactId> <version>0.6.33</version> </dependency> <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>2.5.0</version> </dependency> <dependency> <groupId>com.aliyun.openservices</groupId> <artifactId>loghub-client-lib</artifactId> <version>0.6.16</version> </dependency>
package com.yhzy.doudoubookserver.global.alilog; import com.aliyun.openservices.log.Client; import com.aliyun.openservices.log.producer.LogProducer; import com.aliyun.openservices.log.producer.ProducerConfig; import com.aliyun.openservices.log.producer.ProjectConfig; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Scope; /** * @author zhangqinghe * @version 1.0.0 * @email her550@dingtalk.com * @date 2020/12/23 * @since 1.0.0 */ @Configuration @Scope("singleton") public class AliLogConfig { public static String accessKeyId = ""; public static String accessKeySecret = ""; public static String endPoint = ""; public static String projectName = SLSEnvironment.BOOK_PROJECT; @Bean @ConditionalOnClass(LogProducer.class) public LogProducer getLogProducer() { LogProducer producer = new LogProducer(producerConfig()); producer.setProjectConfig(projectConfig()); return producer; } @Bean @ConditionalOnClass(ProducerConfig.class) public ProducerConfig producerConfig() { ProducerConfig producerConfig = new ProducerConfig(); //被缓存起来的日志的发送超时时间,如果缓存超时,则会被立即发送,单位是毫秒 producerConfig.packageTimeoutInMS = 1000; //每个缓存的日志包的大小的上限,不能超过5MB,单位是字节 producerConfig.logsBytesPerPackage = 5 * 1024 * 1024; //每个缓存的日志包中包含日志数量的最大值,不能超过4096 producerConfig.logsCountPerPackage = 4096; //单个producer实例可以使用的内存的上限,单位是字节 producerConfig.memPoolSizeInByte = 1000 * 1024 * 1024; //IO线程池最大线程数量,主要用于发送数据到日志服务 producerConfig.maxIOThreadSizeInPool = 50; //当使用指定shardhash的方式发送日志时,这个参数需要被设置,否则不需要关心。后端merge线程会将映射到同一个shard的数据merge在一起,而shard关联的是一个hash区间, //producer在处理时会将用户传入的hash映射成shard关联hash区间的最小值。每一个shard关联的hash区间,producer会定时从loghub拉取,该参数的含义是每隔shardHashUpdateIntervalInMS毫秒, producerConfig.shardHashUpdateIntervalInMS = 10 * 60 * 1000; producerConfig.retryTimes = 3; return producerConfig; } @Bean @ConditionalOnClass(ProjectConfig.class) public ProjectConfig projectConfig() { return new ProjectConfig(projectName, endPoint, accessKeyId, accessKeySecret); } /** * 读取sls对象 用于读取数据 * @return */ @Bean public Client client(){ String accessId = ""; String accessKey = ""; String host = ""; return new Client(host, accessId, accessKey); } }
package com.yhzy.doudoubookserver.common; import com.aliyun.openservices.log.common.LogItem; import com.aliyun.openservices.log.producer.LogProducer; import com.yhzy.doudoubookserver.global.alilog.AliLogConfig; import com.yhzy.doudoubookserver.global.alilog.CallbackLogInfo; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.Vector; /** * @author zhangqinghe * @version 1.0.0 * @email her550@dingtalk.com * @date 2020/12/23 * @since 1.0.0 */ @Component public class AliLogUtil { @Resource private AliLogConfig aliLogConfig; public void saveLog(String projectName,String logStore, Vector<LogItem> logGroup, String topic, String source,Long millis) throws InterruptedException { final LogProducer logProducer = aliLogConfig.getLogProducer(); // 并发调用 send 发送日志 logProducer.send(projectName, logStore, topic, source, logGroup, new CallbackLogInfo(projectName, logStore, topic,null, source, logGroup, logProducer)); //主动刷新缓存起来的还没有被发送的日志 logProducer.flush(); //等待发送线程退出 Thread.sleep(millis); //关闭后台io线程,close会将调用时刻内存中缓存的数据发送出去 logProducer.close(); } }
package com.yhzy.doudoubookserver.global.alilog; import com.aliyun.openservices.log.common.LogItem; import com.aliyun.openservices.log.exception.LogException; import com.aliyun.openservices.log.producer.ILogCallback; import com.aliyun.openservices.log.producer.LogProducer; import com.aliyun.openservices.log.response.PutLogsResponse; import java.util.Vector; /** * @author zhangqinghe * @version 1.0.0 * @email her550@dingtalk.com * @date 2020/12/23 * @since 1.0.0 */ public class CallbackLogInfo extends ILogCallback { // 保存要发送的数据,当时发生异常时,进行重试 public String project; public String logstore; public String topic; public String shardHash; public String source; public Vector<LogItem> items; public LogProducer producer; public int retryTimes = 0; public CallbackLogInfo(String project, String logstore, String topic, String shardHash, String source, Vector<LogItem> items, LogProducer producer) { super(); this.project = project; this.logstore = logstore; this.topic = topic; this.shardHash = shardHash; this.source = source; this.items = items; this.producer = producer; } public void onCompletion(PutLogsResponse response, LogException e) { if (e != null) { // 打印异常 System.out.println(e.GetErrorCode() + ", " + e.GetErrorMessage() + ", " + e.GetRequestId()); // 最多重试三次 if (retryTimes++ < 3) { producer.send(project, logstore, topic, source, shardHash, items, this); } } else { //请求id System.out.println("send success, request id: " + response.GetRequestId()); } } }
记录sls日志相关 project & logStore 参数名配置
package com.yhzy.doudoubookserver.global.alilog; import java.time.LocalDateTime; import java.time.ZoneOffset; /** * sls日志相关 project & logStore 参数名配置 * * @author zhangqinghe * @version 1.0.0 * @email her550@dingtalk.com * @date 2020/12/25 * @since 1.0.0 */ public interface SLSEnvironment { /** 测试project*/ String TEST_PROJECT = "test_project"; /** 测试logStore*/ String TEST_LOGSTORE = "test_logstore"; /** 开始时间*/ public static Integer FROM() { return Math.toIntExact(LocalDateTime.now().plusDays(-3).withHour(0).withMinute(0).withSecond(0).withNano(0).toInstant(ZoneOffset.of("+8")).toEpochMilli() / 1000); } /** 结束时间*/ public static Integer TO() { return Math.toIntExact(LocalDateTime.now().plusDays(1).withHour(23).withMinute(59).withSecond(59).withNano(0).toInstant(ZoneOffset.of("+8")).toEpochMilli() / 1000); } /** 最小开始时间 该时间为2020年12月1日的unix时间戳*/ public static Integer MINFROM(){ return 1606752000; } /** 最大结束时间 该时间为当前时间的unix时间戳*/ public static Integer MAXTO() { return Math.toIntExact((System.currentTimeMillis()+86400000) / 1000); } }
读取es中的数据写入到sls中
package com.yhzy; /** * @author zhangqinghe * @version 1.0.0 * @email her550@dingtalk.com * @date 2020/12/24 * @since 1.0.0 */ import com.aliyun.openservices.log.Client; import com.aliyun.openservices.log.common.*; import com.aliyun.openservices.log.exception.LogException; import com.aliyun.openservices.log.response.*; import com.yhzy.doudoubookserver.common.AliLogUtil; import com.yhzy.doudoubookserver.global.alilog.SLSEnvironment; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.RangeQueryBuilder; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import javax.annotation.Resource; import java.io.IOException; import java.time.LocalDateTime; import java.time.ZoneOffset; import java.util.*; /** * @author zhangqinghe * @version 1.0.0 * @email her550@dingtalk.com * @date 2020/12/23 * @since 1.0.0 */ @RunWith(SpringRunner.class) @SpringBootTest(classes = DoudouSLSApplication.class) public class AliLogTest { @Resource private AliLogUtil aliLogUtil; @Resource private RestHighLevelClient restHighLevelClient; /** * 读取分析测试 * @throws LogException */ @Test public void read() throws LogException { String accessId = ""; String accessKey = ""; String host = ""; String project = "zqhtest"; String logStore = "test_logstore"; Client client = new Client(host, accessId, accessKey); long time = new Date().getTime()/1000; /* * project : Project名称。 * logStore : LogStore名称。 * from : 查询开始时间点。Unix时间戳格式,表示从1970-1-1 00:00:00 UTC计算起的秒数。 * to : 查询结束时间点。Unix时间戳格式,表示从1970-1-1 00:00:00 UTC计算起的秒数。 * topic : 日志主题。 * query : 查询分析语句。 具体查看 https://help.aliyun.com/document_detail/53608.html?spm=a2c4g.11186623.2.17.56322e4acEkU4X#concept-nyf-cjq-zdb * line : 请求返回的最大日志条数。最小值为0,最大值为100,默认值为100。 没用明白呢...这个和query中的limit是两个东西???? * offset : 查询开始行。默认值为0。 * reverse : 是否按日志时间戳逆序返回日志,精确到分钟级别。默认值为false。 true:按照逆序返回日志。 false:按照顺序返回日志。 */ GetLogsResponse getLogsResponse = client.GetLogs( project, logStore, (int) (time - 36000000), (int) time, "", "* | select app_version,channel_id,content_id,COUNT(content_id) contentCount,COUNT(content_id) chapterCount, sum(time) timeSum GROUP BY app_version,channel_id,content_id LIMIT 100010", 25, 0, false); for (QueriedLog getLog : getLogsResponse.GetLogs()) { System.out.println(getLog.GetLogItem().ToJsonString()); } } /** * 采集数据测试 * 读取es中的数据,写入到sls中 * @throws IOException */ @Test public void test1() throws IOException { Vector<LogItem> logItems = new Vector<>(); //读取es中的书籍阅读数据 SearchRequest request = new SearchRequest("bookanalysis"); long start = LocalDateTime.now().plusDays(-1).withHour(11).withMinute(0).withSecond(0).withNano(0).toInstant(ZoneOffset.of("+8")).toEpochMilli(); long end = LocalDateTime.now().plusDays(-1).withHour(12).withMinute(0).withSecond(0).withNano(0).toInstant(ZoneOffset.of("+8")).toEpochMilli(); BoolQueryBuilder boolQuery = QueryBuilders.boolQuery(); List<QueryBuilder> filter = boolQuery.filter(); //时间区间条件 filter.add(rangeQuery(start, end, "create_time")); //这里拼接动态条件 SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); //添加条件 searchSourceBuilder.query(boolQuery); request.source(searchSourceBuilder.trackTotalHits(true).size(15000)); SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT); for (SearchHit hit : response.getHits().getHits()) { Map<String, Object> map = hit.getSourceAsMap(); LogItem logItem = new LogItem(); logItem.PushBack("app_version",map.get("app_version")==null?"1":map.get("app_version").toString()); logItem.PushBack("book_id",map.get("book_id")==null?"1":map.get("book_id").toString()); logItem.PushBack("book_name",map.get("book_name")==null?"1":map.get("book_name").toString()); logItem.PushBack("category_id_1",map.get("category_id_1")==null?"1":map.get("category_id_1").toString()); logItem.PushBack("category_id_2",map.get("category_id_2")==null?"1":map.get("category_id_2").toString()); logItem.PushBack("channel_id",map.get("channel_id")==null?"1":map.get("channel_id").toString()); logItem.PushBack("chapter_name",map.get("chapter_name")==null?"1":map.get("chapter_name").toString()); logItem.PushBack("content_id",map.get("content_id")==null?"1":map.get("content_id").toString()); logItem.PushBack("create_time",map.get("create_time")==null?"1":map.get("create_time").toString()); logItem.PushBack("device_id",map.get("device_id")==null?"1":map.get("device_id").toString()); logItem.PushBack("is_new",map.get("is_new")==null?"1":map.get("is_new").toString()); logItem.PushBack("is_official",map.get("is_official")==null?"1":map.get("is_official").toString()); logItem.PushBack("is_vip",map.get("is_vip")==null?"1":map.get("is_vip").toString()); logItem.PushBack("log_type",map.get("log_type")==null?"1":map.get("log_type").toString()); logItem.PushBack("position",map.get("position")==null?"1":map.get("position").toString()); logItem.PushBack("sex",map.get("site")==null?"1":map.get("site").toString()); logItem.PushBack("time",map.get("time")==null?"1":map.get("time").toString()); logItem.PushBack("user_id",map.get("user_id")==null?"1":map.get("user_id").toString()); logItems.add(logItem); } try { aliLogUtil.saveLog(SLSEnvironment.BOOK_PROJECT,SLSEnvironment.BOOK_LOGSTORE,logItems,"read","source ip",1000L); } catch (InterruptedException e) { e.printStackTrace(); } } public static RangeQueryBuilder rangeQuery(Long startTime, Long endTime, String name){ RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery(name); if (startTime != null) { rangeQueryBuilder.gte(startTime); } if (endTime != null) { rangeQueryBuilder.lte(endTime); } return rangeQueryBuilder; } }
看完上述内容,你们掌握SLS日志服务的集成配置是怎样的的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注亿速云行业资讯频道,感谢各位的阅读!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。