亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

如何實現elasticsearch導入mysql數據

發布時間:2021-12-04 11:58:26 來源:億速云 閱讀:245 作者:iii 欄目:大數據

這篇文章主要講解了“如何實現elasticsearch導入mysql數據”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“如何實現elasticsearch導入mysql數據”吧!

一、基于elasticsearch的官方API批量導入

引入maven依賴
	<parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.2.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    
	<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <scope>runtime</scope>
        </dependency>
    </dependencies>

jdbc連接類

public class DBHelper {
    public static final String url =
            "jdbc:mysql://localhost:3306/lagou_db?useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai";
    public static final String name = "com.mysql.cj.jdbc.Driver";
    public static final String user = "root";
    public static final String password = "root";
    public static Connection conn = null;
    public static Connection getConn() {
        try {
            Class.forName(name);
            conn = DriverManager.getConnection(url, user, password);//獲取連接
        } catch (Exception e) {
            e.printStackTrace();
        }
        return conn;
    }
}
導入邏輯
@Service("positionService")
public class PositionService {

    @Autowired
    ElasticsearchRestTemplate elasticsearchTemplate;
    @Autowired
    RestHighLevelClient client;

    private static final String POSITIOIN_INDEX = "position";

    public void importAll() throws IOException {
        writeMysqlDataToES(POSITIOIN_INDEX);
    }
    /** 講數據批量寫入ES中 */
    private void writeMysqlDataToES(String tableName) {
        BulkProcessor bulkProcessor = getBulkProcessor(client);
        Connection conn = null;
        PreparedStatement ps = null;
        ResultSet rs = null;
        try {
            conn = DBHelper.getConn();
            System.out.println("Start handle data :" + tableName);
            String sql = "SELECT * from " + tableName;
            ps = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY,
                    ResultSet.CONCUR_READ_ONLY);
            // 根據自己需要 設置
            ps.setFetchSize(20);
            rs = ps.executeQuery();
            ResultSetMetaData colData = rs.getMetaData();
            ArrayList<HashMap<String, String>> dataList = new
                    ArrayList<HashMap<String, String>>();
            // bulkProcessor 添加的數據支持的方式并不多,查看其api發現其支持map鍵值對的方式,故筆者在此將查出來的數據轉換成hashMap方式
            HashMap<String, String> map = null;
            int count = 0;
            String c = null;
            String v = null;
            while (rs.next()) {
                count++;
                map = new HashMap<String, String>(128);
                for (int i = 1; i <= colData.getColumnCount(); i++) {
                    c = colData.getColumnName(i);
                    v = rs.getString(c);
                    map.put(c, v);
                }
                dataList.add(map);
                // 每1萬條寫一次,不足的批次的最后再一并提交
                if (count % 10000 == 0) {
                    System.out.println("Mysql handle data number : " + count);
                    // 將數據添加到 bulkProcessor 中
                    for (HashMap<String, String> hashMap2 : dataList) {
                        bulkProcessor.add(
                                new IndexRequest(POSITIOIN_INDEX).source(hashMap2));
                    }
                    // 每提交一次便將map與list清空
                    map.clear();
                    dataList.clear();
                }
            }
            // 處理未提交的數據
            for (HashMap<String, String> hashMap2 : dataList) {
                bulkProcessor.add(
                        new IndexRequest(POSITIOIN_INDEX).source(hashMap2));
                System.out.println(hashMap2);
            }
            System.out.println("-------------------------- Finally insert number total: " + count);
            // 將數據刷新到es, 注意這一步執行后并不會立即生效,取決于bulkProcessor設置的刷新時間
            bulkProcessor.flush();
        } catch (Exception e) {
            System.out.println(e.getMessage());
        } finally {
            try {
                rs.close();
                ps.close();
                conn.close();
                boolean terminatedFlag = bulkProcessor.awaitClose(150L,
                        TimeUnit.SECONDS);
                System.out.println(terminatedFlag);
            } catch (Exception e) {
                System.out.println(e.getMessage());
            }
        }
    }

    private BulkProcessor getBulkProcessor(RestHighLevelClient client) {
        BulkProcessor bulkProcessor = null;
        try {
            BulkProcessor.Listener listener = new BulkProcessor.Listener() {
                @Override
                public void beforeBulk(long executionId, BulkRequest request) {
                    System.out.println("Try to insert data number : " + request.numberOfActions());
                }
                @Override
                public void afterBulk(long executionId, BulkRequest request,
                                      BulkResponse response) {
                    System.out.println("************** Success insert data number : "+ request.numberOfActions() + " , id: " +executionId);
                }
                @Override
                public void afterBulk(long executionId, BulkRequest request,
                                      Throwable failure) {
                    System.out.println("Bulk is unsuccess : " + failure + ",executionId: " + executionId);
                }
            };
            BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer =
                    (request, bulkListener) -> client
                            .bulkAsync(request, RequestOptions.DEFAULT, bulkListener);
            BulkProcessor.Builder builder = BulkProcessor.builder(bulkConsumer,
                    listener);
            builder.setBulkActions(5000);
            builder.setBulkSize(new ByteSizeValue(100L, ByteSizeUnit.MB));
            builder.setConcurrentRequests(10);
            builder.setFlushInterval(TimeValue.timeValueSeconds(100L));
            builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3));
            // 注意點:讓參數設置生效
            bulkProcessor = builder.build();
        } catch (Exception e) {
            e.printStackTrace();
            try {
                bulkProcessor.awaitClose(100L, TimeUnit.SECONDS);
            } catch (Exception e1) {
                System.out.println(e1.getMessage());
            }
        }
        return bulkProcessor;
    }
}
調用入口
@RestController
public class PositionController {

    @Autowired
    PositionService positionService;

    @RequestMapping("query")
    public List<Map> query(String positionName) {

        if(positionName == null){
            return null;
        }

        return positionService.queryPositions(positionName);
    }

    @RequestMapping("/importAll")
    public String importAll(){
        try {
            positionService.importAll();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return "success";
    }
}
導入的數據表
public class Position implements Serializable {

    //主鍵
    private String id;
    //公司名稱
    private String companyName;
    //職位名稱
    private String positionName;

    //職位誘惑
    private String positionAdvantage;
    //薪資
    private String salary;
    //薪資下限
    private int salaryMin;
    //薪資上限
    private int salaryMax;
    //學歷
    private String education;
    //工作年限
    private String workYear;
    //發布時間
    private String publishTime;
    //工作城市
    private String city;
    //工作地點
    private String workAddress;
    //發布時間
    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
    private Date createTime;
    //工作模式
    private String jobNature;
}

二、基于logstash導入

前提:安裝好logstash

import.conf
input {
    stdin {
    }
    jdbc {
   
      jdbc_connection_string => "jdbc:mysql://localhost:3306/lagou_db?useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai"
 
      jdbc_user => "root"
      jdbc_password => "root" 
      jdbc_driver_library => "D:/mysql-connector-java-5.1.10.jar"
      jdbc_driver_class => "com.mysql.jdbc.Driver"
      jdbc_paging_enabled => "true"
      jdbc_page_size => "1000"
   
      statement_filepath => "D:/import.sql"
  
 
    }
}
 
filter {
    json {
        source => "message"
        remove_field => ["message"]
    }
}
 
output {
    elasticsearch {
        hosts => ["localhost:9200"]
        index => "position"
        document_type => "_doc"
 
    }
    stdout {
        codec => json_lines
    }
}
import.sql
select * from position
啟動logstash
logstash -f ../import.conf

感謝各位的閱讀,以上就是“如何實現elasticsearch導入mysql數據”的內容了,經過本文的學習后,相信大家對如何實現elasticsearch導入mysql數據這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

鞍山市| 泰安市| 札达县| 舒兰市| 山东省| 晋中市| 临夏县| 平安县| 枣阳市| 集贤县| 碌曲县| 孟州市| 襄汾县| 英山县| 玉屏| 常州市| 贵南县| 海林市| 信阳市| 盐津县| 道真| 灵丘县| 健康| 措勤县| 洱源县| 蒙自县| 米易县| 沈丘县| 宜城市| 呼和浩特市| 宁武县| 洛川县| 京山县| 宣化县| 海南省| 东山县| 嘉义县| 阳江市| 阿拉善右旗| 呼伦贝尔市| 富宁县|