扫二维码与项目经理沟通
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流
福哥答案2020-08-20:
在连平等地区,都构建了全面的区域性战略布局,加强发展的系统性、市场前瞻性、产品创新能力,以专注、极致的服务理念,为客户提供成都做网站、网站设计 网站设计制作按需设计,公司网站建设,企业网站建设,高端网站设计,全网营销推广,成都外贸网站建设,连平网站建设费用合理。
1.golang的协程是基于gpm机制,是可以多核多线程的。Python的协程是eventloop模型(IO多路复用技术)实现,协程是严格的 1:N 关系,也就是一个线程对应了多个协程。虽然可以实现异步I/O,但是不能有效利用多核(GIL)。
2.golang用go func。python用import asyncio,async/await表达式。
评论
协程,又称微线程,纤程。英文名 Coroutine 。Python对协程的支持是通过 generator 实现的。在generator中,我们不但可以通过for循环来迭代,还可以不断调用 next()函数 获取由 yield 语句返回的下一个值。但是Python的yield不但可以返回一个值,它还可以接收调用者发出的参数。yield其实是终端当前的函数,返回给调用方。python3中使用yield来实现range,节省内存,提高性能,懒加载的模式。
asyncio是Python 3.4 版本引入的 标准库 ,直接内置了对异步IO的支持。
从Python 3.5 开始引入了新的语法 async 和 await ,用来简化yield的语法:
import asyncio
import threading
async def compute(x, y):
print("Compute %s + %s ..." % (x, y))
print(threading.current_thread().name)
await asyncio.sleep(x + y)
return x + y
async def print_sum(x, y):
result = await compute(x, y)
print("%s + %s = %s" % (x, y, result))
print(threading.current_thread().name)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
tasks = [print_sum(1, 2), print_sum(3, 4)]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
线程是内核进行抢占式的调度的,这样就确保了每个线程都有执行的机会。而 coroutine 运行在同一个线程中,由语言的运行时中的 EventLoop(事件循环) 来进行调度。和大多数语言一样,在 Python 中,协程的调度是非抢占式的,也就是说一个协程必须主动让出执行机会,其他协程才有机会运行。
让出执行的关键字就是 await。也就是说一个协程如果阻塞了,持续不让出 CPU,那么整个线程就卡住了,没有任何并发。
PS: 作为服务端,event loop最核心的就是IO多路复用技术,所有来自客户端的请求都由IO多路复用函数来处理;作为客户端,event loop的核心在于利用Future对象延迟执行,并使用send函数激发协程,挂起,等待服务端处理完成返回后再调用CallBack函数继续下面的流程
Go语言的协程是 语言本身特性 ,erlang和golang都是采用了CSP(Communicating Sequential Processes)模式(Python中的协程是eventloop模型),但是erlang是基于进程的消息通信,go是基于goroutine和channel的通信。
Python和Go都引入了消息调度系统模型,来避免锁的影响和进程/线程开销大的问题。
协程从本质上来说是一种用户态的线程,不需要系统来执行抢占式调度,而是在语言层面实现线程的调度 。因为协程 不再使用共享内存/数据 ,而是使用 通信 来共享内存/锁,因为在一个超级大系统里具有无数的锁,共享变量等等会使得整个系统变得无比的臃肿,而通过消息机制来交流,可以使得每个并发的单元都成为一个独立的个体,拥有自己的变量,单元之间变量并不共享,对于单元的输入输出只有消息。开发者只需要关心在一个并发单元的输入与输出的影响,而不需要再考虑类似于修改共享内存/数据对其它程序的影响。
切换到新语言始终是一大步,尤其是当您的团队成员只有一个时有该语言的先前经验。现在,Stream 的主要编程语言从 Python 切换到了 Go。这篇文章将解释stream决定放弃 Python 并转向 Go 的一些原因。
Go 非常快。性能类似于 Java 或 C++。对于用例,Go 通常比 Python 快 40 倍。
对于许多应用程序来说,编程语言只是应用程序和数据库之间的粘合剂。语言本身的性能通常并不重要。然而,Stream 是一个API 提供商,为 700 家公司和超过 5 亿最终用户提供提要和聊天平台。多年来,我们一直在优化 Cassandra、PostgreSQL、Redis 等,但最终,您会达到所使用语言的极限。Python 是一门很棒的语言,但对于序列化/反序列化、排名和聚合等用例,它的性能相当缓慢。我们经常遇到性能问题,Cassandra 需要 1 毫秒来检索数据,而 Python 会花费接下来的 10 毫秒将其转换为对象。
看看我如何开始 Go 教程中的一小段 Go 代码。(这是一个很棒的教程,也是学习 Go 的一个很好的起点。)
如果您是 Go 新手,那么在阅读那个小代码片段时不会有太多让您感到惊讶的事情。它展示了多个赋值、数据结构、指针、格式和一个内置的 HTTP 库。当我第一次开始编程时,我一直喜欢使用 Python 更高级的功能。Python 允许您在编写代码时获得相当的创意。例如,您可以:
这些功能玩起来很有趣,但是,正如大多数程序员会同意的那样,在阅读别人的作品时,它们通常会使代码更难理解。Go 迫使你坚持基础。这使得阅读任何人的代码并立即了解发生了什么变得非常容易。 注意:当然,它实际上有多“容易”取决于您的用例。如果你想创建一个基本的 CRUD API,我仍然推荐 Django + DRF或 Rails。
作为一门语言,Go 试图让事情变得简单。它没有引入许多新概念。重点是创建一种非常快速且易于使用的简单语言。它唯一具有创新性的领域是 goroutine 和通道。(100% 正确CSP的概念始于 1977 年,所以这项创新更多是对旧思想的一种新方法。)Goroutines 是 Go 的轻量级线程方法,通道是 goroutines 之间通信的首选方式。Goroutines 的创建非常便宜,并且只需要几 KB 的额外内存。因为 Goroutine 非常轻量,所以有可能同时运行数百甚至数千个。您可以使用通道在 goroutine 之间进行通信。Go 运行时处理所有复杂性。goroutines 和基于通道的并发方法使得使用所有可用的 CPU 内核和处理并发 IO 变得非常容易——所有这些都不会使开发复杂化。与 Python/Java 相比,在 goroutine 上运行函数需要最少的样板代码。您只需在函数调用前加上关键字“go”:
Go 的并发方法很容易使用。与 Node 相比,这是一种有趣的方法,开发人员必须密切关注异步代码的处理方式。Go 中并发的另一个重要方面是竞争检测器。这样可以很容易地确定异步代码中是否存在任何竞争条件。
我们目前用 Go 编写的最大的微服务编译需要 4 秒。与以编译速度慢而闻名的 Java 和 C++ 等语言相比,Go 的快速编译时间是一项重大的生产力胜利。我喜欢在程序编译的时候摸鱼,但在我还记得代码应该做什么的同时完成事情会更好。
首先,让我们从显而易见的开始:与 C++ 和 Java 等旧语言相比,Go 开发人员的数量并不多。根据StackOverflow的数据, 38% 的开发人员知道 Java, 19.3% 的人知道 C++,只有 4.6% 的人知道 Go。GitHub 数据显示了类似的趋势:Go 比 Erlang、Scala 和 Elixir 等语言使用更广泛,但不如 Java 和 C++ 流行。幸运的是,Go 是一种非常简单易学的语言。它提供了您需要的基本功能,仅此而已。它引入的新概念是“延迟”声明和内置的并发管理与“goroutines”和通道。(对于纯粹主义者来说:Go 并不是第一种实现这些概念的语言,只是第一种使它们流行起来的语言。)任何加入团队的 Python、Elixir、C++、Scala 或 Java 开发人员都可以在一个月内在 Go 上发挥作用,因为它的简单性。与许多其他语言相比,我们发现组建 Go 开发人员团队更容易。如果您在博尔德和阿姆斯特丹等竞争激烈的生态系统中招聘人员,这是一项重要的优势。
对于我们这样规模的团队(约 20 人)来说,生态系统很重要。如果您必须重新发明每一个小功能,您根本无法为您的客户创造价值。Go 对我们使用的工具有很好的支持。实体库已经可用于 Redis、RabbitMQ、PostgreSQL、模板解析、任务调度、表达式解析和 RocksDB。与 Rust 或 Elixir 等其他较新的语言相比,Go 的生态系统是一个重大胜利。它当然不如 Java、Python 或 Node 之类的语言好,但它很可靠,而且对于许多基本需求,你会发现已经有高质量的包可用。
Gofmt 是一个很棒的命令行实用程序,内置在 Go 编译器中,用于格式化代码。就功能而言,它与 Python 的 autopep8 非常相似。我们大多数人并不真正喜欢争论制表符与空格。格式的一致性很重要,但实际的格式标准并不那么重要。Gofmt 通过使用一种正式的方式来格式化您的代码来避免所有这些讨论。
Go 对协议缓冲区和 gRPC 具有一流的支持。这两个工具非常适合构建需要通过 RPC 通信的微服务。您只需要编写一个清单,在其中定义可以进行的 RPC 调用以及它们采用的参数。然后从这个清单中自动生成服务器和客户端代码。生成的代码既快速又具有非常小的网络占用空间并且易于使用。从同一个清单中,您甚至可以为许多不同的语言生成客户端代码,例如 C++、Java、Python 和 Ruby。因此,内部流量不再有模棱两可的 REST 端点,您每次都必须编写几乎相同的客户端和服务器代码。.
Go 没有像 Rails 用于 Ruby、Django 用于 Python 或 Laravel 用于 PHP 那样的单一主导框架。这是 Go 社区内激烈争论的话题,因为许多人主张你不应该一开始就使用框架。我完全同意这对于某些用例是正确的。但是,如果有人想构建一个简单的 CRUD API,他们将更容易使用 Django/DJRF、Rails Laravel 或Phoenix。对于 Stream 的用例,我们更喜欢不使用框架。然而,对于许多希望提供简单 CRUD API 的新项目来说,缺乏主导框架将是一个严重的劣势。
Go 通过简单地从函数返回错误并期望调用代码来处理错误(或将其返回到调用堆栈)来处理错误。虽然这种方法有效,但很容易失去问题的范围,以确保您可以向用户提供有意义的错误。错误包通过允许您向错误添加上下文和堆栈跟踪来解决此问题。另一个问题是很容易忘记处理错误。像 errcheck 和 megacheck 这样的静态分析工具可以方便地避免犯这些错误。虽然这些变通办法效果很好,但感觉不太对劲。您希望该语言支持正确的错误处理。
Go 的包管理绝不是完美的。默认情况下,它无法指定特定版本的依赖项,也无法创建可重现的构建。Python、Node 和 Ruby 都有更好的包管理系统。但是,使用正确的工具,Go 的包管理工作得很好。您可以使用Dep来管理您的依赖项,以允许指定和固定版本。除此之外,我们还贡献了一个名为的开源工具VirtualGo,它可以更轻松地处理用 Go 编写的多个项目。
我们进行的一个有趣的实验是在 Python 中使用我们的排名提要功能并在 Go 中重写它。看看这个排名方法的例子:
Python 和 Go 代码都需要执行以下操作来支持这种排名方法:
开发 Python 版本的排名代码大约花了 3 天时间。这包括编写代码、单元测试和文档。接下来,我们花了大约 2 周的时间优化代码。其中一项优化是将分数表达式 (simple_gauss(time)*popularity) 转换为抽象语法树. 我们还实现了缓存逻辑,可以在未来的特定时间预先计算分数。相比之下,开发此代码的 Go 版本大约需要 4 天时间。性能不需要任何进一步的优化。因此,虽然 Python 的最初开发速度更快,但基于 Go 的版本最终需要我们团队的工作量大大减少。另外一个好处是,Go 代码的执行速度比我们高度优化的 Python 代码快大约 40 倍。现在,这只是我们通过切换到 Go 体验到的性能提升的一个示例。
与 Python 相比,我们系统的其他一些组件在 Go 中构建所需的时间要多得多。作为一个总体趋势,我们看到 开发 Go 代码需要更多的努力。但是,我们花更少的时间 优化 代码以提高性能。
我们评估的另一种语言是Elixir.。Elixir 建立在 Erlang 虚拟机之上。这是一种迷人的语言,我们之所以考虑它,是因为我们的一名团队成员在 Erlang 方面拥有丰富的经验。对于我们的用例,我们注意到 Go 的原始性能要好得多。Go 和 Elixir 都可以很好地服务数千个并发请求。但是,如果您查看单个请求的性能,Go 对于我们的用例来说要快得多。我们选择 Go 而不是 Elixir 的另一个原因是生态系统。对于我们需要的组件,Go 有更成熟的库,而在许多情况下,Elixir 库还没有准备好用于生产环境。培训/寻找开发人员使用 Elixir 也更加困难。这些原因使天平向 Go 倾斜。Elixir 的 Phoenix 框架看起来很棒,绝对值得一看。
Go 是一种非常高性能的语言,对并发有很好的支持。它几乎与 C++ 和 Java 等语言一样快。虽然与 Python 或 Ruby 相比,使用 Go 构建东西确实需要更多时间,但您将节省大量用于优化代码的时间。我们在Stream有一个小型开发团队,为超过 5 亿最终用户提供动力和聊天。Go 结合了 强大的生态系统 、新开发人员的 轻松入门、快速的性能 、对并发的 可靠支持和高效的编程环境 ,使其成为一个不错的选择。Stream 仍然在我们的仪表板、站点和机器学习中利用 Python 来提供个性化的订阅源. 我们不会很快与 Python 说再见,但今后所有性能密集型代码都将使用 Go 编写。我们新的聊天 API也完全用 Go 编写。
一、Kafka简述
1. 为什么需要用到消息队列
异步:对比以前的串行同步方式来说,可以在同一时间做更多的事情,提高效率;
解耦:在耦合太高的场景,多个任务要对同一个数据进行操作消费的时候,会导致一个任务的处理因为另一个任务对数据的操作变得及其复杂。
缓冲:当遇到突发大流量的时候,消息队列可以先把所有消息有序保存起来,避免直接作用于系统主体,系统主题始终以一个平稳的速率去消费这些消息。
2.为什么选择kafka呢?
这没有绝对的好坏,看个人需求来选择,我这里就抄了一段他人总结的的优缺点,可见原文
kafka的优点:
1.支持多个生产者和消费者2.支持broker的横向拓展3.副本集机制,实现数据冗余,保证数据不丢失4.通过topic将数据进行分类5.通过分批发送压缩数据的方式,减少数据传输开销,提高吞高量6.支持多种模式的消息7.基于磁盘实现数据的持久化8.高性能的处理信息,在大数据的情况下,可以保证亚秒级的消息延迟9.一个消费者可以支持多种topic的消息10.对CPU和内存的消耗比较小11.对网络开销也比较小12.支持跨数据中心的数据复制13.支持镜像集群
kafka的缺点:
1.由于是批量发送,所以数据达不到真正的实时2.对于mqtt协议不支持3.不支持物联网传感数据直接接入4.只能支持统一分区内消息有序,无法实现全局消息有序5.监控不完善,需要安装插件6.需要配合zookeeper进行元数据管理7.会丢失数据,并且不支持事务8.可能会重复消费数据,消息会乱序,可用保证一个固定的partition内部的消息是有序的,但是一个topic有多个partition的话,就不能保证有序了,需要zookeeper的支持,topic一般需要人工创建,部署和维护一般都比mq高
3. Golang 操作kafka
3.1. kafka的环境
网上有很多搭建kafka环境教程,这里就不再搭建,就展示一下kafka的环境,在kubernetes上进行的搭建,有需要的私我,可以发yaml文件
3.2. 第三方库
github.com/Shopify/sarama // kafka主要的库*github.com/bsm/sarama-cluster // kafka消费组
3.3. 消费者
单个消费者
funcconsumer(){varwg sync.WaitGroup consumer, err := sarama.NewConsumer([]string{"172.20.3.13:30901"},nil)iferr !=nil{ fmt.Println("Failed to start consumer: %s", err)return} partitionList, err := consumer.Partitions("test0")//获得该topic所有的分区iferr !=nil{ fmt.Println("Failed to get the list of partition:, ", err)return}forpartition :=rangepartitionList { pc, err := consumer.ConsumePartition("test0",int32(partition), sarama.OffsetNewest)iferr !=nil{ fmt.Println("Failed to start consumer for partition %d: %s\n", partition, err)return} wg.Add(1)gofunc(sarama.PartitionConsumer){//为每个分区开一个go协程去取值formsg :=rangepc.Messages() {//阻塞直到有值发送过来,然后再继续等待fmt.Printf("Partition:%d, Offset:%d, key:%s, value:%s\n", msg.Partition, msg.Offset,string(msg.Key),string(msg.Value)) }deferpc.AsyncClose() wg.Done() }(pc) } wg.Wait()}funcmain(){ consumer()}
消费组
funcconsumerCluster(){ groupID :="group-1"config := cluster.NewConfig() config.Group.Return.Notifications =trueconfig.Consumer.Offsets.CommitInterval =1* time.Second config.Consumer.Offsets.Initial = sarama.OffsetNewest//初始从最新的offset开始c, err := cluster.NewConsumer(strings.Split("172.20.3.13:30901",","),groupID, strings.Split("test0",","), config)iferr !=nil{ glog.Errorf("Failed open consumer: %v", err)return}deferc.Close()gofunc(c *cluster.Consumer){ errors := c.Errors() noti := c.Notifications()for{select{caseerr := -errors: glog.Errorln(err)case-noti: } } }(c)formsg :=rangec.Messages() { fmt.Printf("Partition:%d, Offset:%d, key:%s, value:%s\n", msg.Partition, msg.Offset,string(msg.Key),string(msg.Value)) c.MarkOffset(msg,"")//MarkOffset 并不是实时写入kafka,有可能在程序crash时丢掉未提交的offset}}funcmain(){goconsumerCluster()}
3.4. 生产者
同步生产者
packagemainimport("fmt""github.com/Shopify/sarama")funcmain(){ config := sarama.NewConfig() config.Producer.RequiredAcks = sarama.WaitForAll//赋值为-1:这意味着producer在follower副本确认接收到数据后才算一次发送完成。config.Producer.Partitioner = sarama.NewRandomPartitioner//写到随机分区中,默认设置8个分区config.Producer.Return.Successes =truemsg := sarama.ProducerMessage{} msg.Topic =`test0`msg.Value = sarama.StringEncoder("Hello World!") client, err := sarama.NewSyncProducer([]string{"172.20.3.13:30901"}, config)iferr !=nil{ fmt.Println("producer close err, ", err)return}deferclient.Close() pid, offset, err := client.SendMessage(msg)iferr !=nil{ fmt.Println("send message failed, ", err)return} fmt.Printf("分区ID:%v, offset:%v \n", pid, offset)}
异步生产者
funcasyncProducer(){ config := sarama.NewConfig() config.Producer.Return.Successes =true//必须有这个选项config.Producer.Timeout =5* time.Second p, err := sarama.NewAsyncProducer(strings.Split("172.20.3.13:30901",","), config)deferp.Close()iferr !=nil{return}//这个部分一定要写,不然通道会被堵塞gofunc(p sarama.AsyncProducer){ errors := p.Errors() success := p.Successes()for{select{caseerr := -errors:iferr !=nil{ glog.Errorln(err) }case-success: } } }(p)for{ v :="async: "+ strconv.Itoa(rand.New(rand.NewSource(time.Now().UnixNano())).Intn(10000)) fmt.Fprintln(os.Stdout, v) msg := sarama.ProducerMessage{ Topic: topics, Value: sarama.ByteEncoder(v), } p.Input() - msg time.Sleep(time.Second *1) }}funcmain(){goasyncProducer()select{ }}
3.5. 结果展示-
同步生产打印:
分区ID:0,offset:90
消费打印:
Partition:0,Offset:90,key:,value:Hello World!
异步生产打印:
async:7272async:7616async:998
消费打印:
Partition:0,Offset:91,key:,value:async:7272Partition:0,Offset:92,key:,value:async:7616Partition:0,Offset:93,key:,value:async:998
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流