这篇文章主要讲解了“如何实现elasticsearch导入mysql数据”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“如何实现elasticsearch导入mysql数据”吧!
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.2.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
</dependencies>
jdbc连接类
public class DBHelper {
public static final String url =
"jdbc:mysql://localhost:3306/lagou_db?useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai";
public static final String name = "com.mysql.cj.jdbc.Driver";
public static final String user = "root";
public static final String password = "root";
public static Connection conn = null;
public static Connection getConn() {
try {
Class.forName(name);
conn = DriverManager.getConnection(url, user, password);//获取连接
} catch (Exception e) {
e.printStackTrace();
}
return conn;
}
}
@Service("positionService")
public class PositionService {
@Autowired
ElasticsearchRestTemplate elasticsearchTemplate;
@Autowired
RestHighLevelClient client;
private static final String POSITIOIN_INDEX = "position";
public void importAll() throws IOException {
writeMysqlDataToES(POSITIOIN_INDEX);
}
/** 讲数据批量写入ES中 */
private void writeMysqlDataToES(String tableName) {
BulkProcessor bulkProcessor = getBulkProcessor(client);
Connection conn = null;
PreparedStatement ps = null;
ResultSet rs = null;
try {
conn = DBHelper.getConn();
System.out.println("Start handle data :" + tableName);
String sql = "SELECT * from " + tableName;
ps = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_READ_ONLY);
// 根据自己需要 设置
ps.setFetchSize(20);
rs = ps.executeQuery();
ResultSetMetaData colData = rs.getMetaData();
ArrayList<HashMap<String, String>> dataList = new
ArrayList<HashMap<String, String>>();
// bulkProcessor 添加的数据支持的方式并不多,查看其api发现其支持map键值对的方式,故笔者在此将查出来的数据转换成hashMap方式
HashMap<String, String> map = null;
int count = 0;
String c = null;
String v = null;
while (rs.next()) {
count++;
map = new HashMap<String, String>(128);
for (int i = 1; i <= colData.getColumnCount(); i++) {
c = colData.getColumnName(i);
v = rs.getString(c);
map.put(c, v);
}
dataList.add(map);
// 每1万条写一次,不足的批次的最后再一并提交
if (count % 10000 == 0) {
System.out.println("Mysql handle data number : " + count);
// 将数据添加到 bulkProcessor 中
for (HashMap<String, String> hashMap2 : dataList) {
bulkProcessor.add(
new IndexRequest(POSITIOIN_INDEX).source(hashMap2));
}
// 每提交一次便将map与list清空
map.clear();
dataList.clear();
}
}
// 处理未提交的数据
for (HashMap<String, String> hashMap2 : dataList) {
bulkProcessor.add(
new IndexRequest(POSITIOIN_INDEX).source(hashMap2));
System.out.println(hashMap2);
}
System.out.println("-------------------------- Finally insert number total: " + count);
// 将数据刷新到es, 注意这一步执行后并不会立即生效,取决于bulkProcessor设置的刷新时间
bulkProcessor.flush();
} catch (Exception e) {
System.out.println(e.getMessage());
} finally {
try {
rs.close();
ps.close();
conn.close();
boolean terminatedFlag = bulkProcessor.awaitClose(150L,
TimeUnit.SECONDS);
System.out.println(terminatedFlag);
} catch (Exception e) {
System.out.println(e.getMessage());
}
}
}
private BulkProcessor getBulkProcessor(RestHighLevelClient client) {
BulkProcessor bulkProcessor = null;
try {
BulkProcessor.Listener listener = new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
System.out.println("Try to insert data number : " + request.numberOfActions());
}
@Override
public void afterBulk(long executionId, BulkRequest request,
BulkResponse response) {
System.out.println("************** Success insert data number : "+ request.numberOfActions() + " , id: " +executionId);
}
@Override
public void afterBulk(long executionId, BulkRequest request,
Throwable failure) {
System.out.println("Bulk is unsuccess : " + failure + ",executionId: " + executionId);
}
};
BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer =
(request, bulkListener) -> client
.bulkAsync(request, RequestOptions.DEFAULT, bulkListener);
BulkProcessor.Builder builder = BulkProcessor.builder(bulkConsumer,
listener);
builder.setBulkActions(5000);
builder.setBulkSize(new ByteSizeValue(100L, ByteSizeUnit.MB));
builder.setConcurrentRequests(10);
builder.setFlushInterval(TimeValue.timeValueSeconds(100L));
builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3));
// 注意点:让参数设置生效
bulkProcessor = builder.build();
} catch (Exception e) {
e.printStackTrace();
try {
bulkProcessor.awaitClose(100L, TimeUnit.SECONDS);
} catch (Exception e1) {
System.out.println(e1.getMessage());
}
}
return bulkProcessor;
}
}
@RestController
public class PositionController {
@Autowired
PositionService positionService;
@RequestMapping("query")
public List<Map> query(String positionName) {
if(positionName == null){
return null;
}
return positionService.queryPositions(positionName);
}
@RequestMapping("/importAll")
public String importAll(){
try {
positionService.importAll();
} catch (IOException e) {
e.printStackTrace();
}
return "success";
}
}
public class Position implements Serializable {
//主键
private String id;
//公司名称
private String companyName;
//职位名称
private String positionName;
//职位诱惑
private String positionAdvantage;
//薪资
private String salary;
//薪资下限
private int salaryMin;
//薪资上限
private int salaryMax;
//学历
private String education;
//工作年限
private String workYear;
//发布时间
private String publishTime;
//工作城市
private String city;
//工作地点
private String workAddress;
//发布时间
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private Date createTime;
//工作模式
private String jobNature;
}
前提:安装好logstash
input {
stdin {
}
jdbc {
jdbc_connection_string => "jdbc:mysql://localhost:3306/lagou_db?useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai"
jdbc_user => "root"
jdbc_password => "root"
jdbc_driver_library => "D:/mysql-connector-java-5.1.10.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "1000"
statement_filepath => "D:/import.sql"
}
}
filter {
json {
source => "message"
remove_field => ["message"]
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "position"
document_type => "_doc"
}
stdout {
codec => json_lines
}
}
select * from position
logstash -f ../import.conf
感谢各位的阅读,以上就是“如何实现elasticsearch导入mysql数据”的内容了,经过本文的学习后,相信大家对如何实现elasticsearch导入mysql数据这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是亿速云,小编将为大家推送更多相关知识点的文章,欢迎关注!
亿速云「云数据库 MySQL」免部署即开即用,比自行安装部署数据库高出1倍以上的性能,双节点冗余防止单节点故障,数据自动定期备份随时恢复。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。
原文链接:https://my.oschina.net/kongjiang1995/blog/4642844