May 24, 2021
9 min read

What’s New in Apache Pulsar 2.7.2

Yong Zhang
Software Engineer, StreamNative
Yu Liu

We are excited to see the Apache Pulsar community has successfully released the 2.7.2 version! More than 38 contributors provided improvements and bug fixes that contributed to 85 commits.

Highlights of this release are as below:

  • Consumers are no longer blocked after receiving multiple retry messages in Docker.
  • Consumers can consume messages published in the topic stats when using the Key_Shared subscription type.

This blog walks through the most noteworthy changes grouped by the key functionality. For the complete list, including all enhancements and bug fixes, check out the Pulsar 2.7.2 Release Notes.

Notable bug fix and enhancement

Pulsar 2.7.2 has included the following changes for broker, bookie, proxy, Pulsar admin, Pulsar SQL, and clients.

Broker

  • Fix NPEs and thread safety issues in PersistentReplicator. PR-9763
  • Previously, in a non-persistent topic with a key-shared subscription, messages were marked as published in the topic stats, but consumers did not consume them. This caused NullPointerExeptions (NPEs).
  • Make cursor field volatile since the field is updated asynchronously in another thread.
  • Remove the unnecessary synchronization on the openCursorAsync method since it is not needed.
  • Add null checks before accessing the cursor field since statistics might be updated before the cursor is available.
  • Fix the issue of a message not dispatched for the Key_Shared subscription type in a non-persistent topic. PR-9826
  • Previously, In a non-persistent topic with a key-shared subscription, messages were marked as published in the topic stats, but consumers did not consume them. This PR fixes this issue.
  • Fix the issue of a consumer being blocked after receiving retry messages. PR-10078
  • Previously, in the Docker environment, if a consumer enabled the retry feature and set the retry topic in DeadLetterPolicy, the consumer was blocked after receiving multiple retry messages because the hasMessageAvailable check was set to false. This PR fixes this issue.
  • Fix the issue of schema not added when subscribing to an empty topic without schema. PR-9853
  • Previously, when a consumer with a schema subscribed to an empty topic without schema, the previous check used isActive, which only checked whether the topic could be deleted. However, it should check if there was any connected producer or consumer of this topic. For the previous implementation, even if a topic had no active producers or consumers, the topic's subscription list was not empty and isActive returned true. Then the consumer's schema was not attached to the topic and it threw an IncompatibleSchemaException.
  • This PR changes to check if the topic has active producers or consumers instead of checking whether it can be deleted.
  • Fix the issue of schema type check when using the ALWAYS_COMPATIBLE strategy. PR-10367
  • This PR provides the following enhancements when using the ALWAYS_COMPATIBLE strategy for schema type check:
  • For non-transitive strategy, it checks only schema type for the last schema.
  • For transitive strategy, it checks all schema types.
  • For getting schema by schema data, it considers different schema types.
  • Fix the issue of CPU 100% usage when deleting namespace. PR-10337
  • Previously, When deleting a namespace, the namespace Policies setting was marked as deleted, triggering the topic's onPoliciesUpdate and a read of the data of ZooKeeper’s Policies node as checkReplicationAndRetryOnFailure. Because the namespace was deleted, the ZooKeeper node no longer existed and the failure to read data triggered infinite retries. This PR fixes this issue by adding a method to check for non-deleted policies.

Bookie

  • Fallback to PULSAR_GC if BOOKIE_GC is not defined. PR-9621
  • This PR changes fallback from PULSAR_MEM to PULSAR_GC if BOOKIE_GC is not defined.
  • Fallback to PULSAR_EXTRA_OPTS if BOOKIE_EXTRA_OPTS is not defined. PR-10397
  • This PR defines that -Dio.netty.* does not pass the system properties if PULSAR_EXTRA_OPTS or BOOKIE_EXTRA_OPTS is set. This change ensures consistency with PULSAR_EXTRA_OPTS behavior and prevents duplicate properties.
  • This PR also adds -Dio.netty.leakDetectionLevel=disabled (unless BOOKIE_EXTRA_OPTS is set) since PULSAR_EXTRA_OPTS does not include that setting by default.

Proxy

  • Fix authorization error while using proxy and Prefix subscription authentication mode. PR-10226
  • Previously, when using Pulsar proxy and Prefix subscription authentication mode, org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider#canConsumeAsync threw an exception, which caused the consumer error.
  • This PR updates the org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider#allowTopicOperationAsync logic, checks isSuperUser first, and then returns isAuthorizedFuture.

Pulsar admin

  • Add get version command for Pulsar REST API, pulsar-admin, and pulsar-client. PR-9975

Pulsar SQL

  • Fix the issue of BKNoSuchLedgerExistsException. PR-9910
  • Previously, when using Pulsar SQL to query messages, BKNoSuchLedgerExistsException was thrown if the ZooKeeper ledger root directory was changed. This PR fixes this issue.

Client

Pulsar 2.7.2 includes the following changes for Java, Python, C++, and WebSocket clients.

Java

  • Fix the issue that ClientConfigurationData's objects are not equal. PR-10091
  • This PR fixes this issue and reuses AuthenticationDisabled.INSTANCE as default instead of creating a new one.
  • Fix the issue of AutoConsumeSchema KeyValue encoding. PR-10089
  • This PR keeps the KeyValueEncodingType when auto-consuming a KeyValue schema.
  • Fix the error of OutOfMemoryError while using KeyValue<GenericRecord, GenericRecord>. PR-9981
  • Previously, a topic with schema KeyValue<GenericRecord, GenericRecord> could not be consumed due to a problem inHttpLookupService. The HttpLookupService downloaded the schema in JSON format but the KeyValue schema was expected to be encoded in binary form.
  • This PR uses the existing utility functions to convert the JSON representation of the KeyValue schema to the desired format.
  • Fix the concurrency issue in the client's producer epoch handling. PR-10436
  • This PR uses a volatile field for epoch and AtomicLongFieldUpdater for incrementing the value.
  • Handle NPE while receiving ack for a closed producer. PR-8979
  • Fix the issue of batch size not set when deserializing from a byte array. PR-9855
  • Previously, batch index message acknowledgment was added to the seek method to support more precise seek using ACK sets. However, when the seek was performed by a message that was serialized and deserialized, the batchSize was set to zero, which led to a discrepancy between messageId forms and seek results. This PR fixes this issue.
  • Fix the issue of a single-topic consumer being unable to close. PR-9849

Python

  • Support setting the default value when using Python Avro Schema. PR-10265
  • Previously, the default value for the Python Avro schema could not be set, causing the Python schema to not be updated.
  • This PR fixes this issue and adds the following changes:
  • Add the required field to control the type of schema that can set null.
  • Add the required_default field to control the schema whether it has a default attribute or not.
  • Add the default field to control the default value of the schema.
  • Fix the issue of nested Map or Array in schema does not work. PR-9548
  • Previously, the Python client did not handle nested Map or Array well, and the generated schema string was invalid. When the Map/Array's schema() method set the values field of the schema string, it ignored the Record type but not Map and Array.
  • This PR fixes the issue and adds 4 tests for Map<Map>, Map<Array>, Array<Array>, and Array<Map> to cover all nested cases that involve Map or Array.
  • Add TLS SNI support for Python and C++ clients. PR-8957
  • This PR adds TLS SNI support for CPP and Python clients, so you can connect to brokers through the proxy.

C++

  • Fix the issue that the C++ client cannot be built on Windows. PR-10363
  • This PR puts PULSAR_PUBLIC before the variable type and keeps the LIB_NAME as the shared library's name (for example, removing the dll suffix).
  • Fix the issue of the paused zero queue consumer pre-fetches messages. PR-10036
  • Previously, zero queue consumers (the consumer's receiver queue size is 0) pre-fetched messages after pauseMessageListener was called. This was because ConsumerImpl::increaseAvailablePermits did not check the boolean variable messageListenerRunning_, which became false after pauseMessageListener was called. Therefore, after the zero queue consumer was paused, it still sent the FLOW command to pre-fetch a message to its internal unbounded queue incomingMessages_.
  • This PR fixes this issue and make the following changes:
  • Add the check for messageListenerRunning_ in increaseAvailablePermits method and make the implementation consistent with Java client's ConsumerImpl#increaseAvailablePermits. Change the type of availablePermits_ to std::atomic_int.
  • Add the increaseAvailablePermits invocation in resumeMessageListener to send FLOW command after consumer resumes since pauseMessageListener does not prefetch messages anymore.
  • Fix the issue of segmentation fault when getting a topic name from the received message ID. PR-10006
  • Previously, the C++ client supported getting a topic name from both the received message and its message ID. However, for a consumer that subscribed to a non-partitioned topic, getting a topic name from the received message ID caused a segmentation fault.
  • This PR uses setTopicName for every single message when a consumer receives a batch and adds related tests for all types of consumers (including ConsumerImpl, MultiTopicsConsumerImpl, and PartitionedConsumerImpl).
  • Fix the issue of the SinglePartitionMessageRouter always picking the same partition. PR-9702
  • Previously, the SinglePartitionMessageRouter was supposed to pick a random partition for a given producer and stick with that. The problem was that the C rand() call always used the seed 0 and that ended up having multiple processes to always deterministically pick the same partition. This PR fixes this issue.
  • Reduce log level for an ack-grouping tracker. PR-10094
  • Previously, the warning log occurred when the ACK grouping tracker tried to send ACKs while the connection was closed.
  • This PR changes the log level to debug when the connection is not ready for AckGroupingTrackerEnabled::flush.

WebSocket

  • Optimize URL token param value. PR-10187
  • This PR removes the Bearer prefix requirement for the token param value of the WebSocket URL.
  • Make the browser client support the token authentication. PR-9886
  • Previously, the WebSocket client used the HTTP request header to transport the authentication params, but the browser JavaScript WebSocket client could not add new headers.
  • This PR uses the query param token to transport the authentication token for the browser JavaScript WebSocket client.

Function and connector

  • Allow customizable function logging. PR-10389
  • Previously, the function log configuration was in the jar package and could not be dynamically customized.
  • This PR changes the function log configuration file to the configuration directory, which can be customized.
  • Pass through record properties from Pulsar sources. PR-9943
  • Fix the issue of the time unit in Pulsar Go functions. PR-10160
  • This PR changes the time unit of avg process latency from ns to ms.
  • Fix the issue that the Kinesis sink did not try to resend messages. PR-10420
  • Previously, when the Kinesis sink connector failed to send a message, it did not retry. In this case, if retainOrdering was enabled, it would lead to subsequent messages not being sent.
  • This PR adds retry logic for the Kinesis sink connector. A message is retried to send if it fails to send.
  • Fix the issue of null error messages in the onFailure exception in the Kinesis sink. PR-10416
  • Previously, if the Kinesis producer failed to send a message, the error message in the onFailure exception was null.
  • This PR extracts the UserRecordFailedException to show the real error messages.

Tiered storage

  • Prevent class loader leak and restore offloader directory override. PR-9878
  • Previously, there was a class loader leak. This PR updates the PulsarService and the PulsarConnectorCache classes to use a map from directory strings to offloaders.
  • Add logs for cleanup of offloaded data operation. PR-9852
  • Previously, the cleanup offloaded data operation lacked logs making it hard for users to analyze the reason for the tiered storage data loss.
  • This PR adds some logs for the cleanup of offloaded data operation.

Get involved

To get started, you can download Pulsar directly or you can spin up a Pulsar cluster on StreamNative Cloud with a free 30-day trial of StreamNative Cloud in which Pulsar 2.7.2 changes are shipped! Moreover, we offer technical consulting and expert training to help get your organization started. As always, we are highly responsive to your feedback. Feel free to contact us if you have any questions at any time. Look forward to hearing from you and stay tuned for the next Pulsar release!

Yong Zhang
Yong Zhang is an Apache Pulsar committer. He works as a software engineer at StreamNative.
Yu Liu
Yu Liu is an Apache Pulsar PMC member and a content strategist from StreamNative.

Newsletter

Our strategies and tactics delivered right to your inbox

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.
Apache Pulsar Announcements
Pulsar Releases