es去重获取重复数据后保留最小值ID后进行批量删除
使用脚本进行去重
获取重复数据后保留最小值ID后进行批量删除
代码如下:
controller:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
/**
* 更新并删除重复数据
*/
@Async
@GetMapping("/up")
@ApiOperation(value = "更新并删除重复数据", notes = "更新并删除重复数据")
public void up(Integer size) {
size = size == null || size <= 0 ? 1000 : size;
boolean state = true;
long num = 0;
do {
BoolQueryBuilder builder = QueryBuilders.boolQuery();
NativeSearchQuery query = new NativeSearchQuery(builder);
query.addAggregation(AggregationBuilders.terms("mail").field("terms").field("mail.keyword").size(size).minDocCount(2));
SearchHits<EsDto> searchHits = elasticsearchRestTemplate.search(query, EsDto.class);
Terms aggs = searchHits.getAggregations().get("mail");
if (aggs.getBuckets().size() == 0) {
state = false;
}
for (Terms.Bucket entry : aggs.getBuckets()) {
if (entry.getDocCount() == 1) {
state = false;
break;
}
String mail = (String) entry.getKey();
log.info(" syncUp mail : {} - mailCount:{}", mail, entry.getDocCount());
NativeSearchQuery queryMail = this.getQuery(0, size, mail);
SearchHits<EsDto> mails = elasticsearchRestTemplate.search(queryMail, EsDto.class);
if (mails.getTotalHits() <= 1) {
state = false;
break;
}
List<EsDto> collect = mails.get().map(e -> e.getContent()).collect(Collectors.toList());
Collection<Integer> ids = collect.stream().skip(1).map(EsDto::getId).collect(Collectors.toSet());
if (ids.size() == 0) {
continue;
}
num += ids.size();
int synchroId = collect.get(0).getId();
esManager.synchronizationUpByBach(ids, synchroId);
}
} while (state);
log.info(" syncUp mail end num: {} ", num);
}
private NativeSearchQuery getQuery(int page, int pageSize, String mail) {
BoolQueryBuilder builder = QueryBuilders.boolQuery();
EsQueryUtil.setMatchPhraseQueryValue("mail.keyword", mail, builder);
NativeSearchQuery query = new NativeSearchQuery(builder);
List<Sort.Order> list = new ArrayList<>();
list.add(new Sort.Order(Sort.Direction.ASC, "id"));
Pageable pageable = PageRequest.of(page, pageSize, Sort.by(list));
query.setPageable(pageable);
query.setTrackTotalHits(true);
return query;
}
manager:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Async
public void synchronizationUpByBach(Collection<Integer> ids, int newId) {
if (ids.size() == 0 || newId == 0) {
return;
}
List<Es> entities = new ArrayList<>();
for (Integer id : ids) {
Es es = new Es();
es.setId(id);
entities.add(es);
}
log.info(" synchronizationUp success old:{} - new:{} ", StringUtils.join(ids,","), newId);
deleteAll(entities);
}
1
2
3
4
5
6
7
8
public void deleteAll(List<Es> entities) {
try {
iEsDao.deleteAll(entities);
} catch (Exception e) {
System.out.println(e.getMessage());
}
}
domain:
1
2
3
4
5
6
7
8
9
10
# indexName 为es索引
@Document(indexName = "res")
public class Es {
@Id
private int id;
private int status;
private String name;
private String mail;
}