티스토리 뷰

DocumentAPI 중에 BulkProcessor 도 존재하는데 이를 사용하여 대량의 문서들을 bulk로 생성해보도록 한다.

 

우선 이전 포스팅에서와 같이 생성할 문서의 내용을 BulkData 클래스로 생성한다는 전제로 작성한 코드이다.

	public Boolean bulkDocumentWithBulkProcessor(ArrayList<BulkData> bulkList) {
		return _bulkDocumentWithBulkProcessor(bulkList, 1000);
	}
	
	public Boolean bulkDocumentWithBulkProcessor(ArrayList<BulkData> bulkList, int bulkActions) {
		return _bulkDocumentWithBulkProcessor(bulkList, bulkActions);
	}
	
	private Boolean _bulkDocumentWithBulkProcessor(ArrayList<BulkData> bulkList, int bulkActions) {
		boolean terminated = false ;
		BulkProcessor bulkProcessor = _getBulkProcessor(bulkActions);
		
		for(BulkData data : bulkList) {
			if(data.getActionType().equals(BulkData.Type.CREATE))
				bulkProcessor.add(new IndexRequest(data.getIndexName(), data.getTypeName(), data.getId()).source(data.getJsonMap()));
			else if(data.getActionType().equals(BulkData.Type.UPDATE))
				bulkProcessor.add(new UpdateRequest(data.getIndexName(), data.getTypeName(), data.getId()).doc(data.getJsonMap()));
			else if(data.getActionType().equals(BulkData.Type.DELETE))
				bulkProcessor.add(new DeleteRequest(data.getIndexName(), data.getTypeName(), data.getId()));
		}
		try {
			terminated = bulkProcessor.awaitClose(30L, TimeUnit.SECONDS);
			log.info("BulkProcessor Finished...." + terminated);
			bulkProcessor.close();
		} catch (InterruptedException e) {
			log.error(e);
		} 
		return terminated;
	}
    
    private BulkProcessor _getBulkProcessor(int bulkActions ) {
		BulkProcessor bulkProcessor = BulkProcessor.builder(
				(request, bulkListener) ->
				client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),new BulkProcessor.Listener() {
					int count = 0;
					
					@Override
					public void beforeBulk(long l, BulkRequest bulkRequest) {
						count = count + bulkRequest.numberOfActions();
						log.info("Uploaded " + count + " so far");
					}
					@Override
					public void afterBulk(long l, BulkRequest bulkRequest, BulkResponse bulkResponse) {
						if (bulkResponse.hasFailures()) {
							for (BulkItemResponse bulkItemResponse : bulkResponse) {
								if (bulkItemResponse.isFailed()) {
									BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
									log.info("Error " + failure.toString());
								}
							}
						}
					}
					@Override
					public void afterBulk(long l, BulkRequest bulkRequest, Throwable throwable) {
						log.info("Errors " + throwable.toString());
					}
				})
				.setBulkActions(bulkActions).setConcurrentRequests(0)
				.setFlushInterval(TimeValue.timeValueSeconds(30L))
				.build();
		return bulkProcessor; 
	}

이렇게 작성한 코드는 bulkDocumentWithBulkProcessor 메소드를 호출하여 생성할 문서의 내용이 담긴 bulkList를 넘겨주면 실행이 된다.

이 때 flush할 카운트를 넘겨주면 해당 카운트마다 flush를 하게 되고 디폴트는 1000개마다 flush를 하게 된다.

 

테스트

	@Test
	public void bulkProcessor2() throws IOException {
		
		ClientApi api = new ClientApi(ip);
		DocumentApi docApi = api.getDocumentApi();
		
		ArrayList<BulkData> bulkList = new ArrayList<BulkData>();
		for(int i=1;i<10001;i++) {
			BulkData data = new BulkData();
			data.setActionType(Type.CREATE);
			data.setIndexName("test");
			data.setTypeName("_doc");
			data.setId(String.valueOf(i));
			Map<String, Object> jsonMap = new HashMap<String, Object>();
			jsonMap.put("code", String.valueOf(i));
			jsonMap.put("title", "제목 "+ String.valueOf(i));
			jsonMap.put("date", new Date());
			data.setJsonMap(jsonMap);
			bulkList.add(data);
		}
		boolean result = docApi.bulkDocumentWithBulkProcessor(bulkList);
		System.out.println(result);
		
		api.close();
	}

 

Database에 있는 내용을 select하여 Elasticsearch에 색인을 생성하도록 하는 예제도 작성해 보자.

select 구문을 통해 생성된 resultset의 내용을 BulkData 에 담아 bulkDocumentWithBulkProcessor() 메소드를 호출하도록 하면 된다.

 

테스트

	@Test
	public void bulkProcessor3() throws IOException {
		
		ClientApi api = new ClientApi(ip);
		DocumentApi docApi = api.getDocumentApi();
		
		ConnectionDB con = new ConnectionDB();
		Connection conn = con.DB("com.mysql.cj.jdbc.Driver", "url", "id", "password");
		try {
			conn.setAutoCommit(false);
		} catch (SQLException e) {
			e.printStackTrace();
		}
		
		ArrayList<BulkData> bulkList = new ArrayList<BulkData>();

		Statement stmt = null;  
		ResultSet rs = null;
		String SQL = "SELECT no, code, title FROM SD_BASE"; 
		try {
			stmt = conn.createStatement();
			rs = stmt.executeQuery(SQL); 
			int count = 0;
			BulkData data;
			Map<String, Object> jsonMap;
			while (rs.next()) { 
				String code = rs.getString("code"); 
				String title = rs.getString("title"); 
				
				data = new BulkData();
				data.setActionType(Type.CREATE);
				data.setIndexName("test");
				data.setTypeName("_doc");
				data.setId(String.valueOf(rs.getInt("no")));
				jsonMap = new HashMap<String, Object>();
				jsonMap.put("code", code);
				jsonMap.put("title", title);
				data.setJsonMap(jsonMap);
				bulkList.add(data);
				
				count++;
				if(count%1000 ==0) {
					boolean result = docApi.bulkDocumentWithBulkProcessor(bulkList);
					System.out.println(result +" :: " + count);
					bulkList.clear();
				}
			} 
			
			boolean result = docApi.bulkDocumentWithBulkProcessor(bulkList);
			System.out.println(result +" :: " + count);
		} catch (SQLException e) {
			e.printStackTrace();
		} finally {
			try {
				rs.close();
				stmt.close();
				conn.close();
			} catch (SQLException e) {
				e.printStackTrace();
			}
		}

		api.close();
	}

 

전체 코드는 

https://github.com/joyhong85/elasticsearch-client

 

joyhong85/elasticsearch-client

Elasticsearch Java High Level Rest Client. Contribute to joyhong85/elasticsearch-client development by creating an account on GitHub.

github.com

에서 확인할 수 있다.

 

최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
«   2024/05   »
1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30 31
글 보관함