Announcing: The Pulsar PMC Published The 2020 Apache Pulsar User Survey Report!

Overview
Get started
Install and upgrade
Configure
Overview
Pulsar core
ZooKeeper
BookKeeper
Broker
KoP
Tiered storage
Pulsar Functions
WebSocket
Proxy
Control center
Tool
Secure
Manage and monitor
Connect
Process
Release notes

Configure KoP

KoP (Kafka on Pulsar) runs as a plugin of Pulsar, it allows you to use Pulsar as the infrastructure without modifying various applications and services based on Kafka API.

After installing StreamNative Platform, you can use KoP directly without extra configuration. If you want to change the configuration, edit the ${PLATFORM_HOME}/etc/pulsar/broker.conf or ${PLATFORM_HOME}/etc/pulsar/standalone.conf file.

You can configure the following properties for KoP.

KoP configuration

The following configurations can be changed and set into the StreamNative Platform broker configuration file.

Must configuration

messagingProtocols and protocolHandlerDirectory must be configured.

Property Set it to the following value Default value
messagingProtocols kafka null
protocolHandlerDirectory Location of KoP NAR file ${PLATFORM_HOME}/share/java/pulsar/protocols

In StreamNative Platform, the default values are set as:

```
messagingProtocols=kafka
protocolHandlerDirectory=share/java/pulsar/protocols
```

Kafka listeners

|listeners| ListenersProp for Kafka service(host should follow the advertisedAddress) |PLAINTEXT://localhost:9092,SSL://localhost:9093|

In StreamNative Platform broker configuration file, if listeners is not configured, the default value is PLAINTEXT://${advertisedAddress}:9092.

If you want to change the default value, it can be : listeners=PLAINTEXT://host_address:9092 or listeners=PLAINTEXT://host_address:9092,SSL://host_address:9093.

Please note the host_address in above listeners configuration should be the same value of advertisedAddress in the configuration file.

Kafka secure configuration

You can see KoP Secure for more details

All KoP configuration

Name Description Default
messagingProtocols The messaging Protocols that are available when loaded by Pulsar Broker kafka
listeners ListenersProp for Kafka service(host should follow the advertisedAddress) PLAINTEXT://localhost:9092,SSL://localhost:9093
kafkaTenant Kafka on Pulsar Broker tenant public
kafkaNamespace Kafka on Pulsar Broker namespace default
kafkaMetadataTenant The tenant used for storing Kafka metadata topics public
kafkaMetadataNamespace The namespace used for storing Kafka metadata topics __kafka
enableGroupCoordinator Flag to enable group coordinator true
groupMinSessionTimeoutMs The minimum allowed session timeout for registered consumers. Shorter timeouts result in quicker failure detection at the cost of more frequent consumer heart beating, which can overwhelm broker resources. 6000
groupMaxSessionTimeoutMs The maximum allowed session timeout for registered consumers. Longer timeouts give consumers more time to process messages between heartbeats at the cost of a longer time to detect failures. 300000
groupInitialRebalanceDelayMs The amount of time the group coordinator will wait for more consumers to join a new group before performing the first rebalance. A longer delay means potentially fewer rebalances, but increases the time until processing begins 3000
offsetsTopicCompressionCodec Compression codec for the offsets topic - compression may be used to achieve "atomic" commits NONE
offsetMetadataMaxSize The maximum size in Bytes for a metadata entry associated with an offset commit 4096
offsetsRetentionMinutes Offsets older than this retention period will be discarded, default 7 days 10080
offsetsRetentionCheckIntervalMs Frequency at which to check for stale offsets 600000
offsetsTopicNumPartitions Number of partitions for the offsets topic 8
kopSslProtocol Kafka ssl configuration map with: SSL_PROTOCOL_CONFIG = ssl.protocol TLS
kopSslProvider Kafka ssl configuration map with: SSL_PROVIDER_CONFIG = ssl.provider
kopSslCipherSuites Kafka ssl configuration map with: SSL_CIPHER_SUITES_CONFIG = ssl.cipher.suites
kopSslEnabledProtocols Kafka ssl configuration map with: SSL_ENABLED_PROTOCOLS_CONFIG = ssl.enabled.protocols TLSv1.2,TLSv1.1,TLSv1
kopSslKeystoreType Kafka ssl configuration map with: SSL_KEYSTORE_TYPE_CONFIG = ssl.keystore.type JKS
kopSslKeystoreLocation Kafka ssl configuration map with: SSL_KEYSTORE_LOCATION_CONFIG = ssl.keystore.location
kopSslKeystorePassword Kafka ssl configuration map with: SSL_TRUSTSTORE_PASSWORD_CONFIG = ssl.truststore.password
kopSslTruststoreType Kafka ssl configuration map with: SSL_KEYSTORE_TYPE_CONFIG = ssl.keystore.type JKS
kopSslTruststoreLocation Kafka ssl configuration map with: SSL_TRUSTSTORE_LOCATION_CONFIG = ssl.truststore.location
kopSslTruststorePassword Kafka ssl configuration map with: SSL_TRUSTSTORE_PASSWORD_CONFIG = ssl.truststore.password
kopSslKeymanagerAlgorithm Kafka ssl configuration map with: SSL_KEYMANAGER_ALGORITHM_CONFIG = ssl.keymanager.algorithm SunX509
kopSslTrustmanagerAlgorithm Kafka ssl configuration map with: SSL_TRUSTMANAGER_ALGORITHM_CONFIG = ssl.trustmanager.algorithm SunX509
kopSslSecureRandomImplementation Kafka ssl configuration map with: SSL_SECURE_RANDOM_IMPLEMENTATION_CONFIG = ssl.secure.random.implementation
saslAllowedMechanisms Supported SASL mechanisms exposed by broker

Log level

In StreamNative Platform, you can set KoP's log level at ${PLATFORM_HOME}/etc/pulsar/log4j2.yaml file.

Example

Logger:
    - name: io.streamnative.pulsar.handlers.kop
    level: warn
    additivity: false
    AppenderRef:
        - ref: Console

More information