加入收藏 | 设为首页 | 会员中心 | 我要投稿 湖南网 (https://www.hunanwang.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 编程 > 正文

MongoDB Stream是怎样实现美满数据增量迁徙的?

发布时间:2018-08-18 09:03:22 所属栏目:编程 来源:zale
导读:技能沙龙 | 邀您于8月25日与国美/AWS/转转三位专家配合切磋小措施电商拭魅战 一、配景先容 最近微处事架构火得不可,但本质上壹贝偾风口上的一个热门词汇。 作为笔者的履素来说,想要应用一个新的架构必要带来的厘革本钱长短常高的。 尽量云云,今朝照旧有许

每一个改观使命会不绝对topic发生写操纵,触发一系列ChangeEvent发生:

  • doInsert:天生随机频道的topic后,执行insert;
  • doUpdate:随机取得一个topic,将其channel字段改为随机值,执行update;
  • doReplace:随机取得一个topic,将其channel字段改为随机值,执行replace;
  • doDelete:随机取得一个topic,执行delete。

以doUpdate为例,实当代码如下:

MongoDB Stream是怎样实现美满数据增量迁徙的?

启动一个全量迁徙使命,将topic表中数据迁徙到topic_new新表:

MongoDB Stream是怎样实现美满数据增量迁徙的?

在全量迁徙开始前,先获适合前时候的的最大 _id 值(可以将此值记录下来)作为终点,随后逐个完成迁徙转换。

在全量迁徙完成后,便开始最后一步:增量迁徙。

注:增量迁徙进程中,改观操纵如故在举办。

  1. final MongoCollection<Document> topicIncrCollection = getCollection(coll_topic_incr);  
  2. final MongoCollection<Document> topicNewCollection = getCollection(coll_topic_new);  
  3. ObjectId currentId = null;  
  4. Document sort = new Document("_id", 1);  
  5. MongoCursor<Document> cursor = null;  
  6. // 批量巨细  
  7. int batchSize = 100;AtomicInteger count = new AtomicInteger(0);  
  8. try {  
  9.     while (true) {  
  10.         boolean isWatchTaskStillRunning = watchFlag.getCount() > 0;  
  11.         // 按ID增量分段拉取  
  12.         if (currentId == null) {  
  13.             cursor = topicIncrCollection.find().sort(sort).limit(batchSize).iterator();  
  14.         } else {  
  15.             cursor = topicIncrCollection.find(new Document("_id", new Document("$gt", currentId)))  
  16.                     .sort(sort).limit(batchSize).iterator();  
  17.         }  
  18.         boolean hasIncrRecord = false;  
  19.         while (cursor.hasNext()) {  
  20.             hasIncrRecord = true;  
  21.             Document incrDoc = cursor.next();  
  22.             OperationType opType = OperationType.fromString(incrDoc.getString(field_op));  
  23.             ObjectId docId = incrDoc.getObjectId(field_key);  
  24.             // 记录当前ID  
  25.             currentId = incrDoc.getObjectId("_id"); 
  26.             if (opType == OperationType.DELETE) {  
  27.                 topicNewCollection.deleteOne(new Document("_id", docId));  
  28.             } else {  
  29.                 Document doc = incrDoc.get(field_data, Document.class);  
  30.                 // channel转换  
  31.                 String oldChannel = doc.getString(field_channel);  
  32.                 doc.put(field_channel, Channel.toNewName(oldChannel));  
  33.                 // 启用upsert  
  34.                 UpdateOptions options = new UpdateOptions().upsert(true);  
  35.                 topicNewCollection.replaceOne(new Document("_id", docId),  
  36.                         incrDoc.get(field_data, Document.class), options);  
  37.             }  
  38.             if (count.incrementAndGet() % 10 == 0) {  
  39.                 logger.info("IncrTransferTask progress, count: {}", count.get());  
  40.             }  
  41.         }  
  42.         // 当watch遏制事变(没有更多改观),同时也没有必要处理赏罚的记录时,跳出  
  43.         if (!isWatchTaskStillRunning && !hasIncrRecord) {  
  44.             break;  
  45.         }  
  46.         sleep(200);  
  47.     } 
  48.  } catch (Exception e) {  
  49.     logger.error("IncrTransferTask ERROR", e);  

增量迁徙的实现是一个不绝tail的进程,操作 **_id 字段的有序特征 ** 举办分段迁徙;即记录下当前处理赏罚的_id值,轮回拉取在该_id值之后的记录举办处理赏罚。

增量表(topic_incr)中除了DELETE改观之外,别的的范例都保存了整个文档,因此可直接操作replace + upsert追加到新表。

最后,运行整个措施。

MongoDB Stream是怎样实现美满数据增量迁徙的?

查察topic表和topic_new表,发明两者数目是沟通的。为了进一步确认同等性,我们对两个表的别离做一次聚合统计:

topic表

MongoDB Stream是怎样实现美满数据增量迁徙的?

topic_new表

MongoDB Stream是怎样实现美满数据增量迁徙的?

前者输出功效:

MongoDB Stream是怎样实现美满数据增量迁徙的?

后者输出功效:

MongoDB Stream是怎样实现美满数据增量迁徙的?

前后比拟的功效是同等的。

五、后续优化

(编辑:湖南网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

热点阅读