This commit is contained in:
ruying408
2024-08-25 17:01:21 +08:00
parent 5bb7aed870
commit a3179df928

View File

@@ -52,9 +52,9 @@ public class SegmentIDGenImpl implements IDGenService {
* 一个Segment维持时间为15分钟 * 一个Segment维持时间为15分钟
*/ */
private static final long SEGMENT_DURATION = 15 * 60 * 1000L; private static final long SEGMENT_DURATION = 15 * 60 * 1000L;
private ExecutorService service = new ThreadPoolExecutor(5, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new UpdateThreadFactory()); private final ExecutorService service = new ThreadPoolExecutor(5, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), new UpdateThreadFactory());
private volatile boolean initOK = false; private volatile boolean initOK = false;
private Map<String, SegmentBuffer> cache = new ConcurrentHashMap<String, SegmentBuffer>(); private final Map<String, SegmentBuffer> cache = new ConcurrentHashMap<>();
private final LeafAllocMapper leafAllocMapper; private final LeafAllocMapper leafAllocMapper;
public static class UpdateThreadFactory implements ThreadFactory { public static class UpdateThreadFactory implements ThreadFactory {
@@ -105,18 +105,15 @@ public class SegmentIDGenImpl implements IDGenService {
try { try {
List<String> dbTags = leafAllocMapper.selectListByQuery(QueryWrapper.create().select( List<String> dbTags = leafAllocMapper.selectListByQuery(QueryWrapper.create().select(
LeafAllocEntity::getKey)).stream().map(LeafAllocEntity::getKey).toList(); LeafAllocEntity::getKey)).stream().map(LeafAllocEntity::getKey).toList();
if (dbTags == null || dbTags.isEmpty()) { if (dbTags.isEmpty()) {
return; return;
} }
List<String> cacheTags = new ArrayList<String>(cache.keySet()); List<String> cacheTags = new ArrayList<String>(cache.keySet());
Set<String> insertTagsSet = new HashSet<>(dbTags); Set<String> insertTagsSet = new HashSet<>(dbTags);
Set<String> removeTagsSet = new HashSet<>(cacheTags); Set<String> removeTagsSet = new HashSet<>(cacheTags);
//db中新加的tags灌进cache //db中新加的tags灌进cache
for(int i = 0; i < cacheTags.size(); i++){ for (String tmp : cacheTags) {
String tmp = cacheTags.get(i); insertTagsSet.remove(tmp);
if(insertTagsSet.contains(tmp)){
insertTagsSet.remove(tmp);
}
} }
for (String tag : insertTagsSet) { for (String tag : insertTagsSet) {
SegmentBuffer buffer = new SegmentBuffer(); SegmentBuffer buffer = new SegmentBuffer();
@@ -129,11 +126,8 @@ public class SegmentIDGenImpl implements IDGenService {
logger.info("Add tag {} from db to IdCache, SegmentBuffer {}", tag, buffer); logger.info("Add tag {} from db to IdCache, SegmentBuffer {}", tag, buffer);
} }
//cache中已失效的tags从cache删除 //cache中已失效的tags从cache删除
for(int i = 0; i < dbTags.size(); i++){ for (String tmp : dbTags) {
String tmp = dbTags.get(i); removeTagsSet.remove(tmp);
if(removeTagsSet.contains(tmp)){
removeTagsSet.remove(tmp);
}
} }
for (String tag : removeTagsSet) { for (String tag : removeTagsSet) {
cache.remove(tag); cache.remove(tag);
@@ -239,26 +233,23 @@ public class SegmentIDGenImpl implements IDGenService {
try { try {
final Segment segment = buffer.getCurrent(); final Segment segment = buffer.getCurrent();
if (!buffer.isNextReady() && (segment.getIdle() < 0.9 * segment.getStep()) && buffer.getThreadRunning().compareAndSet(false, true)) { if (!buffer.isNextReady() && (segment.getIdle() < 0.9 * segment.getStep()) && buffer.getThreadRunning().compareAndSet(false, true)) {
service.execute(new Runnable() { service.execute(() -> {
@Override Segment next = buffer.getSegments()[buffer.nextPos()];
public void run() { boolean updateOk = false;
Segment next = buffer.getSegments()[buffer.nextPos()]; try {
boolean updateOk = false; updateSegmentFromDb(buffer.getKey(), next);
try { updateOk = true;
updateSegmentFromDb(buffer.getKey(), next); logger.info("update segment {} from db {}", buffer.getKey(), next);
updateOk = true; } catch (Exception e) {
logger.info("update segment {} from db {}", buffer.getKey(), next); logger.warn(buffer.getKey() + " updateSegmentFromDb exception", e);
} catch (Exception e) { } finally {
logger.warn(buffer.getKey() + " updateSegmentFromDb exception", e); if (updateOk) {
} finally { buffer.wLock().lock();
if (updateOk) { buffer.setNextReady(true);
buffer.wLock().lock(); buffer.getThreadRunning().set(false);
buffer.setNextReady(true); buffer.wLock().unlock();
buffer.getThreadRunning().set(false); } else {
buffer.wLock().unlock(); buffer.getThreadRunning().set(false);
} else {
buffer.getThreadRunning().set(false);
}
} }
} }
}); });