sharetwitterlinkedIn

Pulsar 2.7.0 新增特性概览:事务支持、Topic 级别策略配置等

December 25, 2020
head img

Apache Pulsar 2.7.0 支持事务、Topic 级别策略配置、Azure Blob 二级存储、支持原生 protobuf schema、支持 Pulsar Functions 端到端加密等,修复了 2.6.2 版本中的诸多问题,改进了一些功能,进一步丰富了 Pulsar 作为云原生流数据平台的功能。

Pulsar 2.7.0 版本中合并了来自社区的 450+ commit,下面一起来看看 2.7.0 版本有哪些更新吧!

支持事务

Pulsar 事务支持事件流应用程序实现原子操作,同时实现消费、处理、生产消息。采用事务语义,Pulsar 的单分区和多分区都可以实现 exactly-once 语义。因此 Pulsar 可以应用于新的使用场景,即客户端(作为 producer 或 consumer)处理多个 Topic 和分区中的消息,并保证将这些消息作为一个消息单元来处理。事务的引入不仅增强了 Pulsar 的消息语义,也会进一步增强 Pulsar Functions 的处理保证。

目前,Pulsar 事务还在开发阶段。Pulsar 社区将会继续开发并完善事务的特性,推进事务在生产环境中的使用。

在 Pulsar 中启用事务,需要在 broker.conf 文件中配置以下参数。

transactionCoordinatorEnabled=true

初始化事务协调组件的元数据后,事务协调组件可以利用分区 topic 的优势,例如负载平衡,可以是事务协调组件分布在多个 broker 中。

bin/pulsar initialize-transaction-coordinator-metadata -cs 127.0.0.1:2181 -c standalone

对于 Pulsar 客户端,也需要启用事务特性。

PulsarClient pulsarClient = PulsarClient.builder()
        .serviceUrl("pulsar://localhost:6650")
        .enableTransaction(true)
        .build();

Pulsar 事务的示例如下:

// Open a transaction
Transaction txn = pulsarClient
        .newTransaction()
        .withTransactionTimeout(5, TimeUnit.MINUTES)
        .build()
        .get();

//  Publish messages with the transaction
producer.newMessage(txn).value("Hello Pulsar Transaction".getBytes()).send();

// Consume and acknowledge messages with the transaction
Message<byte[]> message = consumer.receive();
consumer.acknowledgeAsync(message.getMessageId(), txn);

// Commit the transaction
txn.commit()

关于 Pulsar 事务的更多信息,参阅 transaction 文档。关于 Pulsar 事务设计的更多信息,参阅 Pulsar PIP31

Topic 级别策略

Pulsar 2.7.0 引入了系统 Topic,用于维护所有策略更改事件,进而实现 Topic 级别的策略。目前,命名空间级别的策略都可以在 Topic 级别使用,因此用户可以在 Topic 级别自定义策略,而无需使用大量的元数据服务资源。使用 Topic 级别策略,用户可以更灵活地管理 Topic,同时不会给 ZooKeeper 增加负担。

在 Pulsar 中启用 Topic 级别策略,需要在 broker.conf 文件中配置以下参数。

systemTopicEnabled=true
topicLevelPoliciesEnabled=true

启用 Topic 级别策略后,可以通过 Pulsar Admin 更新 Topic 策略。为特定 Topic 设置数据保留的代码示例如下。

bin/pulsar-admin topics set-retention -s 10G -t 7d persistent://public/default/my-topic

关于 Topic 系统和 Topic 级别策略的更多信息,参阅 Pulsar PIP39

支持 Azure Blob 二级存储

Pulsar 2.7.0 支持 Azure Blob 二级存储。使用此特性,用户可以将历史数据卸载到 Azure Blob 存储,不仅为 Azure 云用户提供极大便利,还可以降低在 BookKeeper 中管理大量历史数据的成本。Pulsar 将在后续版本中为 Azure 云添加更多支持。

在 Pulsar 中使用 Azure Blob 存储,需要在 broker.conf 文件中配置以下参数。

managedLedgerOffloadDriver=azureblob

更多详细信息,参阅 PR-8436

支持原生 protobuf schema

Pulsar 2.7.0 支持原生 protobuf schema,简化了 protobuf 与 Pulsar 集成的操作,能让 protobuf 用户使用 Pulsar 的更多功能。下面的代码展示了如何在 Java 客户端中使用原生 protobuf schema。

Consumer<PBMessage> consumer = client.newConsumer(Schema.PROTOBUFNATIVE(PBMessage.class))
.topic(topic)
.subscriptionName("my-subscription-name")
.subscribe();

更多详细信息,参阅 PR-8372

资源限制

在 Pulsar 中,租户、命名空间和 Topic 是集群的核心资源。Pulsar 2.7.0 支持用户限制集群中租户的数量、租户中命名空间的数量、命名空间中 Topic 的数量,以及 Topic 中的订阅数量。

在 Pulsar 中配置资源限制,需要在 broker.conf 文件中配置以下参数。

maxTenants=0
maxNamespacesPerTenant=0
maxTopicsPerNamespace=0
maxSubscriptionsPerTopic=0

配置资源限制可以为 Pulsar 管理员管理资源提供极大便利。

Pulsar Functions 支持端到端加密

Pulsar 2.7.0 支持为 Pulsar Functions 添加端到端(End-to-End,e2e)加密。用户可以使用配置应用程序加密的公钥和私钥对。只有当密钥有效时,用户才可以解密加密的消息。

通过在命令行指定 --producer-config 即可在 Functions Worker 上启用端到端加密。更多详细信息,参阅 Pulsar Encryption

更多详细信息,参阅 PR-8432

Function 重平衡

2.7.0 版本发布前,Pulsar 没有用于重平衡 worker 上 function 调度程序的机制。Function 之间的工作负载可能相差较多。Pulsar 2.7.0 支持手动触发 function 重平衡,也可以周期性进行自动重平衡。

更多详细信息,参阅 PR-7388PR-7449

更多信息

如果您有任何关于 Pulsar 的疑问或建议,欢迎通过邮件或 Slack 与我们联系。

© StreamNative, Inc. 2022Apache, Apache Pulsar, Apache BookKeeper, Apache Flink, and associated open source project names are trademarks of the Apache Software Foundation.TermsPrivacy