Java更新索引(update & upset)
update
更新使用UpdateRequest(update类型更新,只能更新)
public class EsUpdate{
public void updateIndex(TransportClient client){
Date time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
.parse("2016-7-21 00:00:01");
UpdateRequest updateRequest = new UpdateRequest();
updateRequest.index("pointdata")
.type("pointdata")
.id("1")
.doc(XContentFactory.jsonBuilder()
.startObject()
.field("pointid","W3.UNIT1.10LBG01CP302")
.field("pointvalue","0.8029")
.field("inputtime",
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(time))
.endObject()
);
//执行修改
UpdateResponse response1 = client.update(updateRequest).get();
//查询修改状态,返回ok表示成功
System.out.println(response1.status());
}
}
upset
要用IndexRequest设定添加文档,UpdateRequest设定更新文档,设定upset执行有则修改无则更新(upset类型更新,文档不存在时创建)
public class EsUpSet{
public void updateIndex(TransportClient client){
Date time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
.parse("2016-7-21 00:00:01");
//添加文档
IndexRequest request1 = new IndexRequest("pointdata", "pointData", "1")
.source(
XContentFactory.jsonBuilder()
.startObject()
.field("pointid","W3.UNIT1.10LBG01CP302")
.field("pointvalue","0.8029")
.field("inputtime",
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(time))
.endObject()
);
//修改文档
UpdateRequest updateRequest2 = new UpdateRequest();
updateRequest.index("pointdata")
.type("pointdata")
.id("1")
.doc(XContentFactory.jsonBuilder()
.startObject()
.field("pointid","W3.UNIT1.10LBG01CP302")
.field("pointvalue","0.8029")
.field("inputtime",
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(time))
.endObject()
).upset(request1);
UpdateResponce responce = client.update(request2).get();
//查询修改状态,返回ok表示成功
System.out.println(response2.status());
}
}
基于Bulk的批量更新(update & upset)
动态的更新一个 documents 中的任意 field(字段),包括原来没有的 field (字段)。
public ResultMsg updateIndex(final String index,
final JSONArray resultList) {
final ResultMsg resultMsg = new ResultMsg();
BulkRequestBuilder bulkRequest = client.prepareBulk();
bulkRequest.setRefreshPolicy(RefreshPolicy.IMMEDIATE);
int count = 0;
int len = resultList.size();
try {
for (int i = 0; i < len; i++) {
final XContentBuilder builder = XContentFactory.jsonBuilder()
.startObject();
JSONObject json = resultList.getJSONObject(i);
final String id = json.getString("id");
json.remove("id");
for (String key : json.keySet()) {
builder.field(key, json.get(key));
}
builder.endObject();
// update更新
UpdateRequestBuilder requestBuilder = client.prepareUpdate(
index, index, id).setDoc(builder);
// upset更新
UpdateRequestBuilder requestBuilder = client.prepareUpdate(
index, index, id).setDoc(builder).setUpsert();
// IndexRequestBuilder requestBuilder = client.prepareIndex(
// "sampling", indexName, id).setSource(builder);
bulkRequest.add(requestBuilder);
count++;
if (count % BULK_SIZE == 0) {
BulkResponse bulkResponse = bulkRequest.execute()
.actionGet();
if (bulkResponse.hasFailures()) {
LOGGER.error("批量更新索引数据失败: " + indexName);
LOGGER.error("批量更新索引数据失败: "
+ bulkResponse.buildFailureMessage());
resultMsg.setHasFailures(true);
BulkItemResponse[] responses = bulkResponse.getItems();
for (int k = 0; k < responses.length; k++) {
BulkItemResponse response = responses[k];
if (response.isFailed()) {
ErrorDetail errorDetail = new ErrorDetail();
errorDetail.setIndex(k + (i / BULK_SIZE));
errorDetail.setId(response.getId());
errorDetail
.setMsg(response.getFailureMessage());
resultMsg.addError(errorDetail);
}
}
}
bulkRequest = client.prepareBulk();
bulkRequest.setRefreshPolicy(RefreshPolicy.IMMEDIATE);
count = 0;
}
}
if (count > 0) {
BulkResponse bulkResponse = bulkRequest.execute().actionGet();
if (bulkResponse.hasFailures()) {
LOGGER.error("批量更新索引数据失败: " + indexName);
LOGGER.error("批量更新索引数据失败: "
+ bulkResponse.buildFailureMessage());
resultMsg.setHasFailures(true);
BulkItemResponse[] responses = bulkResponse.getItems();
for (int k = 0; k < responses.length; k++) {
BulkItemResponse response = responses[k];
if (response.isFailed()) {
ErrorDetail errorDetail = new ErrorDetail();
errorDetail.setIndex(k + (len / BULK_SIZE));
errorDetail.setId(response.getId());
errorDetail.setMsg(response.getFailureMessage());
resultMsg.addError(errorDetail);
}
}
}
}
return resultMsg;
} catch (Exception e) {
LOGGER.error("批量更新索引数据失败: " + indexName);
throw new RuntimeException("批量更新索引数据失败: " + indexName, e);
} finally {
bulkRequest.setRefreshPolicy(RefreshPolicy.IMMEDIATE);
}
}
测试:
@Test
public void updateIndex() {
List<JSONObject> jsonList = new ArrayList<>();
JSONObject json = new JSONObject();
json.put("id", "1");
json.put("tag", 1);
jsonList.add(json);
updateIndex("index", jsonList);
}
版权声明:本文不是「本站」原创文章,版权归原作者所有 | 原文地址: