加入收藏 | 设为首页 | 会员中心 | 我要投稿 湖南网 (https://www.hunanwang.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 编程 > 正文

行使Kafka和MongoDB举办Go异步处理赏罚

发布时间:2018-08-24 23:25:40 所属栏目:编程 来源:Melvin Vivas
导读:技能沙龙 | 邀您于8月25日与国美/AWS/转转三位专家配合切磋小措施电商拭魅战 在我前面的博客文章我的第一个 Go 微处事:行使 MongoDB 和 Docker 多阶段构建 中,我建设了一个 Go 微处事示例,它宣布一个 REST 式的 http 端点,并将从 HTTP POST 中吸取到的数
副问题[/!--empirenews.page--] 技能沙龙 | 邀您于8月25日与国美/AWS/转转三位专家配合切磋小措施电商拭魅战

行使Kafka和MongoDB举办Go异步处理赏罚

在我前面的博客文章 “我的第一个 Go 微处事:行使 MongoDB 和 Docker 多阶段构建” 中,我建设了一个 Go 微处事示例,它宣布一个 REST 式的 http 端点,并将从 HTTP POST 中吸取到的数据生涯到 MongoDB 数据库。

在这个示例中,我将数据的生涯和 MongoDB 疏散,并建设另一个微处事行止理赏罚它。我还添加了 Kafka 为动静层处事,这样微处事就可以异步处理赏罚它本身材贴的对象了。

假如你偶然刻去看,我将这个博客文章的整个进程录制到 这个视频中了 :)

下面是这个行使了两个微处事的简朴的异步处理赏罚示例的上层架构图。

rest-kafka-mongo-microservice-draw-io

rest-kafka-mongo-microservice-draw-io

微处事 1 —— 是一个 REST 式微处事,它从一个 /POST http 挪用中吸取数据。吸取到哀求之后,它从 http 哀求中检索数据,并将它生涯到 Kafka。生涯之后,它通过 /POST 发送沟通的数据去相应挪用者。

微处事 2 —— 是一个订阅了 Kafka 中的一个主题的微处事,微处事 1 的数据生涯在该主题。一旦动静被微处事斲丧之后,它接着生涯数据到 MongoDB 中。

在你继承之前,我们必要可以或许去运行这些微处事的几件对象:

  1. 下载 Kafka —— 我行使的版本是 kafka_2.11-1.1.0
  2. 安装 librdkafka —— 不幸的是,这个库应该在方针体系中
  3. 安装 Kafka Go 客户端
  4. 运行 MongoDB。你可以去看我的 早年的文章 中关于这一块的内容,那篇文章中我行使了一个 MongoDB docker 镜像。

我们开始吧!

起首,启动 Kafka,在你运行 Kafka 处事器之前,你必要运行 Zookeeper。下面是示例:

  1. $ cd /<download path>/kafka_2.11-1.1.0
  2. $ bin/zookeeper-server-start.sh config/zookeeper.properties

接着运行 Kafka —— 我行使 9092 端口毗连到 Kafka。假如你必要改变端口,只必要在 config/server.properties 中设置即可。假如你像我一样是个新手,我提议你此刻照旧行使默认端口。

  1. $ bin/kafka-server-start.sh config/server.properties

Kafka 跑起来之后,我们必要 MongoDB。它很简朴,只必要行使这个 docker-compose.yml 即可。

  1. version: '3'
  2. services:
  3. mongodb:
  4. image: mongo
  5. ports:
  6. - "27017:27017"
  7. volumes:
  8. - "mongodata:/data/db"
  9. networks:
  10. - network1
  11.  
  12. volumes:
  13. mongodata:
  14.  
  15. networks:
  16. network1:

行使 Docker Compose 去运行 MongoDB docker 容器。

  1. docker-compose up

这里是微处事 1 的相干代码。我只是修改了我前面的示例去生涯到 Kafka 而不是 MongoDB:

rest-to-kafka/rest-kafka-sample.go

  1. func jobsPostHandler(w http.ResponseWriter, r *http.Request) {
  2.  
  3. //Retrieve body from http request
  4. b, err := ioutil.ReadAll(r.Body)
  5. defer r.Body.Close()
  6. if err != nil {
  7. panic(err)
  8. }
  9.  
  10. //Save data into Job struct
  11. var _job Job
  12. err = json.Unmarshal(b, &_job)
  13. if err != nil {
  14. http.Error(w, err.Error(), 500)
  15. return
  16. }
  17.  
  18. saveJobToKafka(_job)
  19.  
  20. //Convert job struct into json
  21. jsonString, err := json.Marshal(_job)
  22. if err != nil {
  23. http.Error(w, err.Error(), 500)
  24. return
  25. }
  26.  
  27. //Set content-type http header
  28. w.Header().Set("content-type", "application/json")
  29.  
  30. //Send back data as response
  31. w.Write(jsonString)
  32.  
  33. }
  34.  
  35. func saveJobToKafka(job Job) {
  36.  
  37. fmt.Println("save to kafka")
  38.  
  39. jsonString, err := json.Marshal(job)
  40.  
  41. jobString := string(jsonString)
  42. fmt.Print(jobString)
  43.  
  44. p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092"})
  45. if err != nil {
  46. panic(err)
  47. }
  48.  
  49. // Produce messages to topic (asynchronously)
  50. topic := "jobs-topic1"
  51. for _, word := range []string{string(jobString)} {
  52. p.Produce(&kafka.Message{
  53. TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
  54. Value: []byte(word),
  55. }, nil)
  56. }
  57. }

这里是微处事 2 的代码。在这个代码中最重要的对象是从 Kafka 中斲丧数据,生涯部门我已经在前面的博客文章中接头过了。这里代码的重点部门是从 Kafka 中斲丧数据:

(编辑:湖南网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

热点阅读