您好,登錄后才能下訂單哦!
這篇文章主要講解了“如何實現elasticsearch導入mysql數據”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“如何實現elasticsearch導入mysql數據”吧!
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.2.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-elasticsearch</artifactId> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <scope>runtime</scope> </dependency> </dependencies>
jdbc連接類
public class DBHelper { public static final String url = "jdbc:mysql://localhost:3306/lagou_db?useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai"; public static final String name = "com.mysql.cj.jdbc.Driver"; public static final String user = "root"; public static final String password = "root"; public static Connection conn = null; public static Connection getConn() { try { Class.forName(name); conn = DriverManager.getConnection(url, user, password);//獲取連接 } catch (Exception e) { e.printStackTrace(); } return conn; } }
@Service("positionService") public class PositionService { @Autowired ElasticsearchRestTemplate elasticsearchTemplate; @Autowired RestHighLevelClient client; private static final String POSITIOIN_INDEX = "position"; public void importAll() throws IOException { writeMysqlDataToES(POSITIOIN_INDEX); } /** 講數據批量寫入ES中 */ private void writeMysqlDataToES(String tableName) { BulkProcessor bulkProcessor = getBulkProcessor(client); Connection conn = null; PreparedStatement ps = null; ResultSet rs = null; try { conn = DBHelper.getConn(); System.out.println("Start handle data :" + tableName); String sql = "SELECT * from " + tableName; ps = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); // 根據自己需要 設置 ps.setFetchSize(20); rs = ps.executeQuery(); ResultSetMetaData colData = rs.getMetaData(); ArrayList<HashMap<String, String>> dataList = new ArrayList<HashMap<String, String>>(); // bulkProcessor 添加的數據支持的方式并不多,查看其api發現其支持map鍵值對的方式,故筆者在此將查出來的數據轉換成hashMap方式 HashMap<String, String> map = null; int count = 0; String c = null; String v = null; while (rs.next()) { count++; map = new HashMap<String, String>(128); for (int i = 1; i <= colData.getColumnCount(); i++) { c = colData.getColumnName(i); v = rs.getString(c); map.put(c, v); } dataList.add(map); // 每1萬條寫一次,不足的批次的最后再一并提交 if (count % 10000 == 0) { System.out.println("Mysql handle data number : " + count); // 將數據添加到 bulkProcessor 中 for (HashMap<String, String> hashMap2 : dataList) { bulkProcessor.add( new IndexRequest(POSITIOIN_INDEX).source(hashMap2)); } // 每提交一次便將map與list清空 map.clear(); dataList.clear(); } } // 處理未提交的數據 for (HashMap<String, String> hashMap2 : dataList) { bulkProcessor.add( new IndexRequest(POSITIOIN_INDEX).source(hashMap2)); System.out.println(hashMap2); } System.out.println("-------------------------- Finally insert number total: " + count); // 將數據刷新到es, 注意這一步執行后并不會立即生效,取決于bulkProcessor設置的刷新時間 bulkProcessor.flush(); } catch (Exception e) { System.out.println(e.getMessage()); } finally { try { rs.close(); ps.close(); conn.close(); boolean terminatedFlag = bulkProcessor.awaitClose(150L, TimeUnit.SECONDS); System.out.println(terminatedFlag); } catch (Exception e) { System.out.println(e.getMessage()); } } } private BulkProcessor getBulkProcessor(RestHighLevelClient client) { BulkProcessor bulkProcessor = null; try { BulkProcessor.Listener listener = new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { System.out.println("Try to insert data number : " + request.numberOfActions()); } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { System.out.println("************** Success insert data number : "+ request.numberOfActions() + " , id: " +executionId); } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { System.out.println("Bulk is unsuccess : " + failure + ",executionId: " + executionId); } }; BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer = (request, bulkListener) -> client .bulkAsync(request, RequestOptions.DEFAULT, bulkListener); BulkProcessor.Builder builder = BulkProcessor.builder(bulkConsumer, listener); builder.setBulkActions(5000); builder.setBulkSize(new ByteSizeValue(100L, ByteSizeUnit.MB)); builder.setConcurrentRequests(10); builder.setFlushInterval(TimeValue.timeValueSeconds(100L)); builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3)); // 注意點:讓參數設置生效 bulkProcessor = builder.build(); } catch (Exception e) { e.printStackTrace(); try { bulkProcessor.awaitClose(100L, TimeUnit.SECONDS); } catch (Exception e1) { System.out.println(e1.getMessage()); } } return bulkProcessor; } }
@RestController public class PositionController { @Autowired PositionService positionService; @RequestMapping("query") public List<Map> query(String positionName) { if(positionName == null){ return null; } return positionService.queryPositions(positionName); } @RequestMapping("/importAll") public String importAll(){ try { positionService.importAll(); } catch (IOException e) { e.printStackTrace(); } return "success"; } }
public class Position implements Serializable { //主鍵 private String id; //公司名稱 private String companyName; //職位名稱 private String positionName; //職位誘惑 private String positionAdvantage; //薪資 private String salary; //薪資下限 private int salaryMin; //薪資上限 private int salaryMax; //學歷 private String education; //工作年限 private String workYear; //發布時間 private String publishTime; //工作城市 private String city; //工作地點 private String workAddress; //發布時間 @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") private Date createTime; //工作模式 private String jobNature; }
前提:安裝好logstash
input { stdin { } jdbc { jdbc_connection_string => "jdbc:mysql://localhost:3306/lagou_db?useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai" jdbc_user => "root" jdbc_password => "root" jdbc_driver_library => "D:/mysql-connector-java-5.1.10.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_paging_enabled => "true" jdbc_page_size => "1000" statement_filepath => "D:/import.sql" } } filter { json { source => "message" remove_field => ["message"] } } output { elasticsearch { hosts => ["localhost:9200"] index => "position" document_type => "_doc" } stdout { codec => json_lines } }
select * from position
logstash -f ../import.conf
感謝各位的閱讀,以上就是“如何實現elasticsearch導入mysql數據”的內容了,經過本文的學習后,相信大家對如何實現elasticsearch導入mysql數據這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。