Configuration - Apache Celeborn™
Skip to content
Configuration Guide
This documentation contains Celeborn configuration details and a tuning guide.
Important Configurations
Environment Variables
CELEBORN_WORKER_MEMORY=4g
CELEBORN_WORKER_OFFHEAP_MEMORY=24g
Celeborn workers tend to improve performance by using off-heap buffers.
Off-heap memory requirement can be estimated as below:
numDirs = `celeborn.worker.storage.dirs` # the amount of directory will be used by Celeborn storage
bufferSize = `celeborn.worker.flusher.buffer.size` # the amount of memory will be used by a single flush buffer
off-heap-memory = (disk buffer * disks) + network memory # the disk buffer is a logical memory region that stores shuffle data received from network
# shuffle data will be flushed to disks through write tasks
# the amount of disk buffer can be set to 1GB or larger for each disk according to the difference of your disk speed and network speed
For example, if a Celeborn worker give each disk 1GiB memory and the buffer size is set to 256 KB.
Celeborn worker can support up to 4096 concurrent write tasks for each disk.
If this worker has 10 disks, the offheap memory should be set to 12GB.
Network memory will be consumed when netty reads from a TCP channel, there will need some extra
memory. Empirically, Celeborn worker off-heap memory should be set to
((disk buffer * disks) * 1.2)
All Configurations
Master
Key
Default
isDynamic
Description
Since
Deprecated
celeborn.cluster.name
default
false
Celeborn cluster name.
0.5.0
celeborn.container.info.provider
org.apache.celeborn.server.common.container.DefaultContainerInfoProvider
false
ContainerInfoProvider class name. Default class is
org.apache.celeborn.server.common.container.DefaultContainerInfoProvider
0.6.0
celeborn.dynamicConfig.refresh.interval
120s
false
Interval for refreshing the corresponding dynamic config periodically.
0.4.0
celeborn.dynamicConfig.store.backend

false
Store backend for dynamic config service. The store backend can be specified in two ways: - Using the short name of the store backend defined in the implementation of
ConfigStore#getName
whose return value can be mapped to the corresponding backend implementation. Available options: FS, DB. - Using the service class name of the store backend implementation. If not provided, it means that dynamic configuration is disabled.
0.4.0
celeborn.dynamicConfig.store.db.fetch.pageSize
1000
false
The page size for db store to query configurations.
0.5.0
celeborn.dynamicConfig.store.db.hikari.connectionTimeout
30s
false
The connection timeout that a client will wait for a connection from the pool for db store backend.
0.5.0
celeborn.dynamicConfig.store.db.hikari.driverClassName
false
The jdbc driver class name of db store backend.
0.5.0
celeborn.dynamicConfig.store.db.hikari.idleTimeout
600s
false
The idle timeout that a connection is allowed to sit idle in the pool for db store backend.
0.5.0
celeborn.dynamicConfig.store.db.hikari.jdbcUrl
false
The jdbc url of db store backend.
0.5.0
celeborn.dynamicConfig.store.db.hikari.maxLifetime
1800s
false
The maximum lifetime of a connection in the pool for db store backend.
0.5.0
celeborn.dynamicConfig.store.db.hikari.maximumPoolSize
false
The maximum pool size of db store backend.
0.5.0
celeborn.dynamicConfig.store.db.hikari.password
false
The password of db store backend.
0.5.0
celeborn.dynamicConfig.store.db.hikari.username
false
The username of db store backend.
0.5.0
celeborn.dynamicConfig.store.fs.path

false
The path of dynamic config file for fs store backend. The file format should be yaml. The default path is
${CELEBORN_CONF_DIR}/dynamicConfig.yaml
0.5.0
celeborn.http.auth.bypass.api.paths
false
A comma-separated list of additional API paths that bypass authentication. The path must match exactly and is case-sensitive. Wildcards not accepted.
0.7.0
celeborn.internal.port.enabled
false
false
Whether to create a internal port on Masters/Workers for inter-Masters/Workers communication. This is beneficial when SASL authentication is enforced for all interactions between clients and Celeborn Services, but the services can exchange messages without being subject to SASL authentication.
0.5.0
celeborn.logConf.enabled
false
false
When
true
, log the CelebornConf for debugging purposes.
0.5.0
celeborn.master.allowWorkerHostPattern

false
Pattern of worker host that allowed to register with the master. If not set, all workers are allowed to register.
0.6.0
celeborn.master.denyWorkerHostPattern

false
Pattern of worker host that denied to register with the master. If not set, no workers are denied to register.
0.6.0
celeborn.master.dfs.expireDirs.timeout
1h
false
The timeout for an expired dirs to be deleted on dfs like HDFS, S3, OSS.
0.6.0
celeborn.master.estimatedPartitionSize.initialSize
64mb
false
Initial partition size for estimation, it will change according to runtime stats.
0.3.0
celeborn.shuffle.initialEstimatedPartitionSize
celeborn.master.estimatedPartitionSize.maxSize

false
Max partition size for estimation. Default value should be celeborn.worker.shuffle.partitionSplit.max * 2.
0.4.1
celeborn.master.estimatedPartitionSize.minSize
8mb
false
Ignore partition size smaller than this configuration of partition size for estimation.
0.3.0
celeborn.shuffle.minPartitionSizeToEstimate
celeborn.master.estimatedPartitionSize.update.initialDelay
5min
false
Initial delay time before start updating partition size for estimation.
0.3.0
celeborn.shuffle.estimatedPartitionSize.update.initialDelay
celeborn.master.estimatedPartitionSize.update.interval
10min
false
Interval of updating partition size for estimation.
0.3.0
celeborn.shuffle.estimatedPartitionSize.update.interval
celeborn.master.excludeWorker.autoReleaseHighWorkLoadEnabled
false
false
Whether to release workers with high workload in excluded worker list.
0.7.0
celeborn.master.excludeWorker.autoReleaseHighWorkLoadRatioThreshold
0.3
false
Whenever the number of worker with high workload exceeds this ratio, master will release worker with high workload in excluded worker list. If this value is set to 0, such workers will never be excluded.
0.7.0
celeborn.master.excludeWorker.unhealthyDiskRatioThreshold
1.0
false
Max ratio of unhealthy disks for excluding worker, when unhealthy disk is larger than max unhealthy count, master will exclude worker. If this value is set to 1, master will exclude worker of which disks are all unhealthy.
0.6.0
celeborn.master.heartbeat.application.timeout
300s
false
Application heartbeat timeout.
0.3.0
celeborn.application.heartbeat.timeout
celeborn.master.heartbeat.worker.timeout
120s
false
Worker heartbeat timeout.
0.3.0
celeborn.worker.heartbeat.timeout
celeborn.master.host

false
Hostname for master to bind.
0.2.0
celeborn.master.http.auth.administers
false
A comma-separated list of users who have admin privileges, Note, when celeborn.master.http.auth.supportedSchemes is not set, everyone is treated as administrator.
0.6.0
celeborn.master.http.auth.basic.provider
org.apache.celeborn.common.authentication.AnonymousAuthenticationProviderImpl
false
User-defined password authentication implementation of org.apache.celeborn.spi.authentication.PasswdAuthenticationProvider
0.6.0
celeborn.master.http.auth.bearer.provider
org.apache.celeborn.common.authentication.AnonymousAuthenticationProviderImpl
false
User-defined token authentication implementation of org.apache.celeborn.spi.authentication.TokenAuthenticationProvider
0.6.0
celeborn.master.http.auth.supportedSchemes
false
A comma-separated list of master http auth supported schemes.
SPNEGO: Kerberos/GSSAPI authentication.
BASIC: User-defined password authentication, the concreted implementation is configurable via
celeborn.master.http.auth.basic.provider
BEARER: User-defined bearer token authentication, the concreted implementation is configurable via
celeborn.master.http.auth.bearer.provider
0.6.0
celeborn.master.http.host

false
Master's http host.
0.4.0
celeborn.metrics.master.prometheus.host,celeborn.master.metrics.prometheus.host
celeborn.master.http.idleTimeout
30s
false
Master http server idle timeout.
0.5.0
celeborn.master.http.maxWorkerThreads
200
false
Maximum number of threads in the master http worker thread pool.
0.5.0
celeborn.master.http.port
9098
false
Master's http port.
0.4.0
celeborn.metrics.master.prometheus.port,celeborn.master.metrics.prometheus.port
celeborn.master.http.proxy.client.ip.header
X-Real-IP
false
The HTTP header to record the real client IP address. If your server is behind a load balancer or other proxy, the server will see this load balancer or proxy IP address as the client IP address, to get around this common issue, most load balancers or proxies offer the ability to record the real remote IP address in an HTTP header that will be added to the request for other devices to use. Note that, because the header value can be specified to any IP address, so it will not be used for authentication.
0.6.0
celeborn.master.http.spnego.keytab

false
The keytab file for SPNego authentication.
0.6.0
celeborn.master.http.spnego.principal

false
SPNego service principal, typical value would look like HTTP/_HOST@EXAMPLE.COM. SPNego service principal would be used when celeborn http authentication is enabled. This needs to be set only if SPNEGO is to be used in authentication.
0.6.0
celeborn.master.http.ssl.disallowed.protocols
SSLv2,SSLv3
false
SSL versions to disallow.
0.6.0
celeborn.master.http.ssl.enabled
false
false
Set this to true for using SSL encryption in http server.
0.6.0
celeborn.master.http.ssl.include.ciphersuites
false
A comma-separated list of include SSL cipher suite names.
0.6.0
celeborn.master.http.ssl.keystore.algorithm

false
SSL certificate keystore algorithm.
0.6.0
celeborn.master.http.ssl.keystore.password

false
SSL certificate keystore password.
0.6.0
celeborn.master.http.ssl.keystore.path

false
SSL certificate keystore location.
0.6.0
celeborn.master.http.ssl.keystore.type

false
SSL certificate keystore type.
0.6.0
celeborn.master.http.stopTimeout
5s
false
Master http server stop timeout.
0.5.0
celeborn.master.internal.port
8097
false
Internal port on the master where both workers and other master nodes connect.
0.5.0
celeborn.master.persist.workerNetworkLocation
false
false
0.6.0
celeborn.master.port
9097
false
Port for master to bind.
0.2.0
celeborn.master.rackResolver.refresh.interval
30s
false
Interval for refreshing the node rack information periodically.
0.5.0
celeborn.master.send.applicationMeta.threads
false
Number of threads used by the Master to send ApplicationMeta to Workers.
0.5.0
celeborn.master.slot.assign.extraSlots
false
Extra slots number when master assign slots. Provided enough workers are available.
0.3.0
celeborn.slots.assign.extraSlots
celeborn.master.slot.assign.interruptionAware
false
false
If this is set to true, Celeborn master will prioritize partition placement on workers that are not in scope for maintenance soon.
0.7.0
celeborn.master.slot.assign.interruptionAware.threshold
50
false
This controls what percentage of hosts would be selected for slot selection in the first iteration of creating partitions. Default is 50%.
0.7.0
celeborn.master.slot.assign.loadAware.diskGroupGradient
0.1
false
This value means how many more workload will be placed into a faster disk group than a slower group.
0.3.0
celeborn.slots.assign.loadAware.diskGroupGradient
celeborn.master.slot.assign.loadAware.fetchTimeWeight
1.0
false
Weight of average fetch time when calculating ordering in load-aware assignment strategy
0.3.0
celeborn.slots.assign.loadAware.fetchTimeWeight
celeborn.master.slot.assign.loadAware.flushTimeWeight
0.0
false
Weight of average flush time when calculating ordering in load-aware assignment strategy
0.3.0
celeborn.slots.assign.loadAware.flushTimeWeight
celeborn.master.slot.assign.loadAware.numDiskGroups
false
This configuration is a guidance for load-aware slot allocation algorithm. This value is control how many disk groups will be created.
0.3.0
celeborn.slots.assign.loadAware.numDiskGroups
celeborn.master.slot.assign.maxWorkers
10000
false
Max workers that slots of one shuffle can be allocated on. Will choose the smaller positive one from Master side and Client side, see
celeborn.client.slot.assign.maxWorkers
0.3.1
celeborn.master.slot.assign.minWorkers
100
false
Min workers that slots of one shuffle should be allocated on. Provided enough workers are available.
0.6.0
celeborn.master.slot.assign.policy
ROUNDROBIN
false
Policy for master to assign slots, Celeborn supports two types of policy: roundrobin and loadaware. Loadaware policy will be ignored when
HDFS
is enabled in
celeborn.storage.availableTypes
0.3.0
celeborn.slots.assign.policy
celeborn.master.userResourceConsumption.metrics.enabled
false
false
Whether to enable resource consumption metrics.
0.6.0
celeborn.master.userResourceConsumption.update.interval
30s
false
Time length for a window about compute user resource consumption.
0.3.0
celeborn.master.workerUnavailableInfo.expireTimeout
1800s
false
Worker unavailable info would be cleared when the retention period is expired. Set -1 to disable the expiration.
0.3.1
celeborn.quota.cluster.enabled
true
false
Whether to enable cluster-level quota.
0.6.0
celeborn.quota.enabled
true
false
When Master side sets to true, the master will enable to check the quota via QuotaManager. When Client side sets to true, LifecycleManager will request Master side to check whether the current user has enough quota before registration of shuffle. Fallback to the default shuffle service when Master side checks that there is no enough quota for current user.
0.2.0
celeborn.quota.tenant.enabled
true
false
Whether to enable tenant-level quota.
0.6.0
celeborn.quota.user.enabled
true
false
Whether to enable user-level quota.
0.6.0
celeborn.redaction.regex
(?i)secret
password
token
access[.]key
false
celeborn.storage.availableTypes
HDD
false
Enabled storages. Available options: MEMORY,HDD,SSD,HDFS,S3,OSS. Note: HDD and SSD would be treated as identical.
0.3.0
celeborn.storage.activeTypes
celeborn.storage.hdfs.dir

false
HDFS base directory for Celeborn to store shuffle data.
0.2.0
celeborn.storage.hdfs.kerberos.keytab

false
Kerberos keytab file path for HDFS storage connection.
0.3.2
celeborn.storage.hdfs.kerberos.principal

false
Kerberos principal for HDFS storage connection.
0.3.2
celeborn.storage.oss.access.key

false
OSS access key for Celeborn to store shuffle data.
0.6.0
celeborn.storage.oss.dir

false
OSS base directory for Celeborn to store shuffle data.
0.6.0
celeborn.storage.oss.endpoint

false
OSS endpoint for Celeborn to store shuffle data.
0.6.0
celeborn.storage.oss.ignore.credentials
true
false
Whether to skip oss credentials, disable this config to support jindo sdk .
0.6.0
celeborn.storage.oss.secret.key

false
OSS secret key for Celeborn to store shuffle data.
0.6.0
celeborn.storage.s3.dir

false
S3 base directory for Celeborn to store shuffle data.
0.6.0
celeborn.storage.s3.endpoint.region

false
S3 endpoint for Celeborn to store shuffle data.
0.6.0
celeborn.tags.enabled
true
false
Whether to enable tags for workers.
0.6.0
celeborn.tags.preferClientTagsExpr
false
true
When
true
, prefer the tags expression provided by the client over the tags expression provided by the master.
0.6.0
celeborn.tags.tagsExpr
true
Expression to filter workers by tags. The expression is a comma-separated list of tags. The expression is evaluated as a logical AND of all tags. For example,
prod,high-io
filters workers that have both the
prod
and
high-io
tags.
0.6.0
Apart from these, the following properties are also available for enable master HA:
Master HA
Key
Default
isDynamic
Description
Since
Deprecated
celeborn.master.ha.enabled
false
false
When true, master nodes run as Raft cluster mode.
0.3.0
celeborn.ha.enabled
celeborn.master.ha.node..host

false
Host to bind of master node
in HA mode.
0.3.0
celeborn.ha.master.node..host
celeborn.master.ha.node..internal.port
8097
false
Internal port for the workers and other masters to bind to a master node
in HA mode.
0.5.0
celeborn.master.ha.node..port
9097
false
Port to bind of master node
in HA mode.
0.3.0
celeborn.ha.master.node..port
celeborn.master.ha.node..ratis.port
9872
false
Ratis port to bind of master node
in HA mode.
0.3.0
celeborn.ha.master.node..ratis.port
celeborn.master.ha.ratis.raft.rpc.type
netty
false
RPC type for Ratis, available options: netty, grpc.
0.3.0
celeborn.ha.master.ratis.raft.rpc.type
celeborn.master.ha.ratis.raft.server.storage.dir
/tmp/ratis
false
Root storage directory to hold RaftServer data.
0.3.0
celeborn.ha.master.ratis.raft.server.storage.dir
celeborn.master.ha.ratis.raft.server.storage.startup.option
RECOVER
false
Startup option of RaftServer storage. Available options: RECOVER, FORMAT.
0.5.0
Worker
Key
Default
isDynamic
Description
Since
Deprecated
celeborn.cluster.name
default
false
Celeborn cluster name.
0.5.0
celeborn.container.info.provider
org.apache.celeborn.server.common.container.DefaultContainerInfoProvider
false
ContainerInfoProvider class name. Default class is
org.apache.celeborn.server.common.container.DefaultContainerInfoProvider
0.6.0
celeborn.dynamicConfig.refresh.interval
120s
false
Interval for refreshing the corresponding dynamic config periodically.
0.4.0
celeborn.dynamicConfig.store.backend

false
Store backend for dynamic config service. The store backend can be specified in two ways: - Using the short name of the store backend defined in the implementation of
ConfigStore#getName
whose return value can be mapped to the corresponding backend implementation. Available options: FS, DB. - Using the service class name of the store backend implementation. If not provided, it means that dynamic configuration is disabled.
0.4.0
celeborn.dynamicConfig.store.db.fetch.pageSize
1000
false
The page size for db store to query configurations.
0.5.0
celeborn.dynamicConfig.store.db.hikari.connectionTimeout
30s
false
The connection timeout that a client will wait for a connection from the pool for db store backend.
0.5.0
celeborn.dynamicConfig.store.db.hikari.driverClassName
false
The jdbc driver class name of db store backend.
0.5.0
celeborn.dynamicConfig.store.db.hikari.idleTimeout
600s
false
The idle timeout that a connection is allowed to sit idle in the pool for db store backend.
0.5.0
celeborn.dynamicConfig.store.db.hikari.jdbcUrl
false
The jdbc url of db store backend.
0.5.0
celeborn.dynamicConfig.store.db.hikari.maxLifetime
1800s
false
The maximum lifetime of a connection in the pool for db store backend.
0.5.0
celeborn.dynamicConfig.store.db.hikari.maximumPoolSize
false
The maximum pool size of db store backend.
0.5.0
celeborn.dynamicConfig.store.db.hikari.password
false
The password of db store backend.
0.5.0
celeborn.dynamicConfig.store.db.hikari.username
false
The username of db store backend.
0.5.0
celeborn.dynamicConfig.store.fs.path

false
The path of dynamic config file for fs store backend. The file format should be yaml. The default path is
${CELEBORN_CONF_DIR}/dynamicConfig.yaml
0.5.0
celeborn.http.auth.bypass.api.paths
false
A comma-separated list of additional API paths that bypass authentication. The path must match exactly and is case-sensitive. Wildcards not accepted.
0.7.0
celeborn.internal.port.enabled
false
false
Whether to create a internal port on Masters/Workers for inter-Masters/Workers communication. This is beneficial when SASL authentication is enforced for all interactions between clients and Celeborn Services, but the services can exchange messages without being subject to SASL authentication.
0.5.0
celeborn.logConf.enabled
false
false
When
true
, log the CelebornConf for debugging purposes.
0.5.0
celeborn.master.endpoints
:9097
false
Endpoints of master nodes for celeborn clients to connect. Client uses resolver provided by celeborn.master.endpoints.resolver to resolve the master endpoints. By default Celeborn uses
org.apache.celeborn.common.client.StaticMasterEndpointResolver
which take static master endpoints as input. Allowed pattern:
:[,:]*
, e.g.
clb1:9097,clb2:9098,clb3:9099
. If the port is omitted, 9097 will be used. If the master endpoints are not static then users can pass custom resolver implementation to discover master endpoints actively using celeborn.master.endpoints.resolver.
0.2.0
celeborn.master.endpoints.resolver
org.apache.celeborn.common.client.StaticMasterEndpointResolver
false
Resolver class that can be used for discovering and updating the master endpoints. This allows users to provide a custom master endpoint resolver implementation. This is useful in environments where the master nodes might change due to scaling operations or infrastructure updates. Clients need to ensure that provided resolver class should be present in the classpath.
0.5.2
celeborn.master.estimatedPartitionSize.minSize
8mb
false
Ignore partition size smaller than this configuration of partition size for estimation.
0.3.0
celeborn.shuffle.minPartitionSizeToEstimate
celeborn.master.internal.endpoints
:8097
false
Endpoints of master nodes just for celeborn workers to connect, allowed pattern is:
:[,:]*
, e.g.
clb1:8097,clb2:8097,clb3:8097
. If the port is omitted, 8097 will be used.
0.5.0
celeborn.redaction.regex
(?i)secret
password
token
access[.]key
false
celeborn.shuffle.chunk.size
8m
false
Max chunk size of reducer's merged shuffle data. For example, if a reducer's shuffle data is 128M and the data will need 16 fetch chunk requests to fetch.
0.2.0
celeborn.shuffle.sortPartition.block.compactionFactor
0.25
false
Combine sorted shuffle blocks such that size of compacted shuffle block does not exceed compactionFactor * celeborn.shuffle.chunk.size
0.4.2
celeborn.storage.availableTypes
HDD
false
Enabled storages. Available options: MEMORY,HDD,SSD,HDFS,S3,OSS. Note: HDD and SSD would be treated as identical.
0.3.0
celeborn.storage.activeTypes
celeborn.storage.hdfs.dir

false
HDFS base directory for Celeborn to store shuffle data.
0.2.0
celeborn.storage.hdfs.kerberos.keytab

false
Kerberos keytab file path for HDFS storage connection.
0.3.2
celeborn.storage.hdfs.kerberos.principal

false
Kerberos principal for HDFS storage connection.
0.3.2
celeborn.storage.oss.access.key

false
OSS access key for Celeborn to store shuffle data.
0.6.0
celeborn.storage.oss.dir

false
OSS base directory for Celeborn to store shuffle data.
0.6.0
celeborn.storage.oss.endpoint

false
OSS endpoint for Celeborn to store shuffle data.
0.6.0
celeborn.storage.oss.ignore.credentials
true
false
Whether to skip oss credentials, disable this config to support jindo sdk .
0.6.0
celeborn.storage.oss.secret.key

false
OSS secret key for Celeborn to store shuffle data.
0.6.0
celeborn.storage.s3.dir

false
S3 base directory for Celeborn to store shuffle data.
0.6.0
celeborn.storage.s3.endpoint.region

false
S3 endpoint for Celeborn to store shuffle data.
0.6.0
celeborn.storage.s3.mpu.baseDelay
100ms
false
S3 MPU base sleep time (milliseconds) for retryable exceptions.
0.6.0
celeborn.storage.s3.mpu.maxBackoff
20s
false
S3 MPU max sleep time (milliseconds) for retryable exceptions.
0.6.0
celeborn.storage.s3.mpu.maxRetries
false
S3 MPU upload max retries.
0.6.0
celeborn.worker.activeConnection.max

false
If the number of active connections on a worker exceeds this configuration value, the worker will be marked as high-load in the heartbeat report, and the master will not include that node in the response of RequestSlots.
0.3.1
celeborn.worker.applicationRegistry.cache.size
10000
false
Cache size of the application registry on Workers.
0.5.0
celeborn.worker.bufferStream.threadsPerMountpoint
false
Threads count for read buffer per mount point.
0.3.0
celeborn.worker.clean.threads
64
false
Thread number of worker to clean up expired shuffle keys.
0.3.2
celeborn.worker.closeIdleConnections
false
false
Whether worker will close idle connections.
0.2.0
celeborn.worker.commitFiles.check.interval
100
false
Time length for a window about checking whether commit shuffle data files finished.
0.6.0
celeborn.worker.commitFiles.fsync
false
false
Whether to fsync (fdatasync) shuffle data when committing. When enabled, each partition file is fsynced to disk before the commit completes ensuring committed data survives OS crashes, hard reboots etc. Enabling ensures durability but can add some latency to commit times.
0.7.0
celeborn.worker.commitFiles.threads
32
false
Thread number of worker to commit shuffle data files asynchronously. It's recommended to set at least
128
when
HDFS
is enabled in
celeborn.storage.availableTypes
0.3.0
celeborn.worker.commit.threads
celeborn.worker.commitFiles.timeout
120s
false
Timeout for a Celeborn worker to commit files of a shuffle. It's recommended to set at least
240s
when
HDFS
is enabled in
celeborn.storage.availableTypes
0.3.0
celeborn.worker.shuffle.commit.timeout
celeborn.worker.congestionControl.check.interval
10ms
false
Interval of worker checks congestion if celeborn.worker.congestionControl.enabled is true.
0.3.2
celeborn.worker.congestionControl.diskBuffer.high.watermark
9223372036854775807b
false
If the total bytes in disk buffer exceeds this configure, will start to congest users whose produce rate is higher than the potential average consume rate. The congestion will stop if the produce rate is lower or equal to the average consume rate, or the total pending bytes lower than celeborn.worker.congestionControl.diskBuffer.low.watermark
0.3.0
celeborn.worker.congestionControl.high.watermark
celeborn.worker.congestionControl.diskBuffer.low.watermark
9223372036854775807b
false
Will stop congest users if the total pending bytes of disk buffer is lower than this configuration
0.3.0
celeborn.worker.congestionControl.low.watermark
celeborn.worker.congestionControl.enabled
false
false
Whether to enable congestion control or not.
0.3.0
celeborn.worker.congestionControl.sample.time.window
10s
false
The worker holds a time sliding list to calculate users' produce/consume rate
0.3.0
celeborn.worker.congestionControl.user.inactive.interval
10min
false
How long will consider this user is inactive if it doesn't send data
0.3.0
celeborn.worker.congestionControl.userProduceSpeed.high.watermark
9223372036854775807b
false
For those users that produce byte speeds greater than this configuration, start congestion for these users
0.6.0
celeborn.worker.congestionControl.userProduceSpeed.low.watermark
9223372036854775807b
false
For those users that produce byte speeds less than this configuration, stop congestion for these users
0.6.0
celeborn.worker.congestionControl.workerProduceSpeed.high.watermark
9223372036854775807b
false
Start congestion If worker total produce speed greater than this configuration
0.6.0
celeborn.worker.congestionControl.workerProduceSpeed.low.watermark
9223372036854775807b
false
Stop congestion If worker total produce speed less than this configuration
0.6.0
celeborn.worker.decommission.checkInterval
30s
false
The wait interval of checking whether all the shuffle expired during worker decommission
0.4.0
celeborn.worker.decommission.forceExitTimeout
6h
false
The wait time of waiting for all the shuffle expire during worker decommission.
0.4.0
celeborn.worker.directMemoryRatioForMemoryFileStorage
0.0
false
Max ratio of direct memory to store shuffle data. This feature is experimental and disabled by default.
0.5.0
celeborn.worker.directMemoryRatioForReadBuffer
0.35
false
Max ratio of direct memory for read buffer
0.2.0
celeborn.worker.directMemoryRatioToMergeBuffer
0.4
false
If direct memory usage is above this limit, the worker will merge low utilization push data's body buffer
0.6.2
celeborn.worker.directMemoryRatioToPauseReceive
0.85
false
If direct memory usage reaches this limit, the worker will stop to receive data from Celeborn shuffle clients.
0.2.0
celeborn.worker.directMemoryRatioToPauseReplicate
0.95
false
If direct memory usage reaches this limit, the worker will stop to receive replication data from other workers. This value should be higher than celeborn.worker.directMemoryRatioToPauseReceive.
0.2.0
celeborn.worker.directMemoryRatioToResume
0.7
false
If direct memory usage is less than this limit, worker will resume.
0.2.0
celeborn.worker.disk.clean.threads
false
Thread number of worker to clean up directories of expired shuffle keys on disk.
0.3.2
celeborn.worker.fetch.heartbeat.enabled
false
false
enable the heartbeat from worker to client when fetching data
0.3.0
celeborn.worker.fetch.io.threads

false
Netty IO thread number of worker to handle client fetch data. The default threads number is the number of flush thread.
0.2.0
celeborn.worker.fetch.port
false
Server port for Worker to receive fetch data request from ShuffleClient.
0.2.0
celeborn.worker.flusher.buffer.size
256k
false
Size of buffer used by a single flusher.
0.2.0
celeborn.worker.flusher.diskTime.slidingWindow.size
20
false
The size of sliding windows used to calculate statistics about flushed time and count.
0.3.0
celeborn.worker.flusher.avgFlushTime.slidingWindow.size
celeborn.worker.flusher.hdd.threads
false
Flusher's thread count per disk used for write data to HDD disks.
0.2.0
celeborn.worker.flusher.hdfs.buffer.size
4m
false
Size of buffer used by a HDFS flusher.
0.3.0
celeborn.worker.flusher.hdfs.threads
false
Flusher's thread count used for write data to HDFS.
0.2.0
celeborn.worker.flusher.oss.buffer.size
6m
false
Size of buffer used by a OSS flusher.
0.6.0
celeborn.worker.flusher.oss.threads
false
Flusher's thread count used for write data to OSS.
0.6.0
celeborn.worker.flusher.s3.buffer.size
6m
false
Size of buffer used by a S3 flusher.
0.6.0
celeborn.worker.flusher.s3.threads
false
Flusher's thread count used for write data to S3.
0.6.0
celeborn.worker.flusher.shutdownTimeout
3s
false
Timeout for a flusher to shutdown.
0.2.0
celeborn.worker.flusher.ssd.threads
16
false
Flusher's thread count per disk used for write data to SSD disks.
0.2.0
celeborn.worker.flusher.threads
16
false
Flusher's thread count per disk for unknown-type disks.
0.2.0
celeborn.worker.graceful.shutdown.checkSlotsFinished.interval
1s
false
The wait interval of checking whether all released slots to be committed or destroyed during worker graceful shutdown
0.2.0
celeborn.worker.graceful.shutdown.checkSlotsFinished.timeout
480s
false
The wait time of waiting for the released slots to be committed or destroyed during worker graceful shutdown.
0.2.0
celeborn.worker.graceful.shutdown.dbDeleteFailurePolicy
IGNORE
false
Policy for handling DB delete failures during graceful shutdown. THROW: throw exception, EXIT: trigger graceful shutdown, IGNORE: log error and continue (default).
0.7.0
celeborn.worker.graceful.shutdown.enabled
false
false
When true, during worker shutdown, the worker will wait for all released slots to be committed or destroyed.
0.2.0
celeborn.worker.graceful.shutdown.partitionSorter.shutdownTimeout
120s
false
The wait time of waiting for sorting partition files during worker graceful shutdown.
0.2.0
celeborn.worker.graceful.shutdown.recoverDbBackend
ROCKSDB
false
Specifies a disk-based store used in local db. ROCKSDB or LEVELDB (deprecated).
0.4.0
celeborn.worker.graceful.shutdown.recoverPath
/recover
false
The path to store DB.
0.2.0
celeborn.worker.graceful.shutdown.saveCommittedFileInfo.interval
5s
false
Interval for a Celeborn worker to flush committed file infos into DB.
0.3.1
celeborn.worker.graceful.shutdown.saveCommittedFileInfo.sync
false
false
Whether to call sync method to save committed file infos into DB to handle OS crash.
0.3.1
celeborn.worker.graceful.shutdown.timeout
600s
false
The worker's graceful shutdown timeout time.
0.2.0
celeborn.worker.hdfs.replication.factor
false
HDFS replication factor for shuffle files.
0.7.0
celeborn.worker.http.auth.administers
false
A comma-separated list of users who have admin privileges, Note, when celeborn.worker.http.auth.supportedSchemes is not set, everyone is treated as administrator.
0.6.0
celeborn.worker.http.auth.basic.provider
org.apache.celeborn.common.authentication.AnonymousAuthenticationProviderImpl
false
User-defined password authentication implementation of org.apache.celeborn.common.authentication.PasswdAuthenticationProvider
0.6.0
celeborn.worker.http.auth.bearer.provider
org.apache.celeborn.common.authentication.AnonymousAuthenticationProviderImpl
false
User-defined token authentication implementation of org.apache.celeborn.common.authentication.TokenAuthenticationProvider
0.6.0
celeborn.worker.http.auth.supportedSchemes
false
A comma-separated list of worker http auth supported schemes.
SPNEGO: Kerberos/GSSAPI authentication.
BASIC: User-defined password authentication, the concreted implementation is configurable via
celeborn.worker.http.auth.basic.provider
BEARER: User-defined bearer token authentication, the concreted implementation is configurable via
celeborn.worker.http.auth.bearer.provider
0.6.0
celeborn.worker.http.host

false
Worker's http host.
0.4.0
celeborn.metrics.worker.prometheus.host,celeborn.worker.metrics.prometheus.host
celeborn.worker.http.idleTimeout
30s
false
Worker http server idle timeout.
0.5.0
celeborn.worker.http.maxWorkerThreads
200
false
Maximum number of threads in the worker http worker thread pool.
0.5.0
celeborn.worker.http.port
9096
false
Worker's http port.
0.4.0
celeborn.metrics.worker.prometheus.port,celeborn.worker.metrics.prometheus.port
celeborn.worker.http.proxy.client.ip.header
X-Real-IP
false
The HTTP header to record the real client IP address. If your server is behind a load balancer or other proxy, the server will see this load balancer or proxy IP address as the client IP address, to get around this common issue, most load balancers or proxies offer the ability to record the real remote IP address in an HTTP header that will be added to the request for other devices to use. Note that, because the header value can be specified to any IP address, so it will not be used for authentication.
0.6.0
celeborn.worker.http.spnego.keytab

false
The keytab file for SPNego authentication.
0.6.0
celeborn.worker.http.spnego.principal

false
SPNego service principal, typical value would look like HTTP/_HOST@EXAMPLE.COM. SPNego service principal would be used when celeborn http authentication is enabled. This needs to be set only if SPNEGO is to be used in authentication.
0.6.0
celeborn.worker.http.ssl.disallowed.protocols
SSLv2,SSLv3
false
SSL versions to disallow.
0.6.0
celeborn.worker.http.ssl.enabled
false
false
Set this to true for using SSL encryption in http server.
0.6.0
celeborn.worker.http.ssl.include.ciphersuites
false
A comma-separated list of include SSL cipher suite names.
0.6.0
celeborn.worker.http.ssl.keystore.algorithm

false
SSL certificate keystore algorithm.
0.6.0
celeborn.worker.http.ssl.keystore.password

false
SSL certificate keystore password.
0.6.0
celeborn.worker.http.ssl.keystore.path

false
SSL certificate keystore location.
0.6.0
celeborn.worker.http.ssl.keystore.type

false
SSL certificate keystore type.
0.6.0
celeborn.worker.http.stopTimeout
5s
false
Worker http server stop timeout.
0.5.0
celeborn.worker.internal.port
false
Internal server port on the Worker where the master nodes connect.
0.5.0
celeborn.worker.jvmProfiler.enabled
false
false
Turn on code profiling via async_profiler in workers.
0.5.0
celeborn.worker.jvmProfiler.localDir
false
Local file system path on worker where profiler output is saved. Defaults to the working directory of the worker process.
0.5.0
celeborn.worker.jvmProfiler.options
event=wall,interval=10ms,alloc=2m,lock=10ms,chunktime=300s
false
Options to pass on to the async profiler.
0.5.0
celeborn.worker.jvmQuake.check.interval
1s
false
Interval of gc behavior checking for worker jvm quake.
0.4.0
celeborn.worker.jvmQuake.dump.enabled
true
false
Whether to heap dump for the maximum GC 'deficit' during worker jvm quake.
0.4.0
celeborn.worker.jvmQuake.dump.path
/jvm-quake/dump/
false
The path of heap dump for the maximum GC 'deficit' during worker jvm quake.
0.4.0
celeborn.worker.jvmQuake.dump.threshold
30s
false
The threshold of heap dump for the maximum GC 'deficit' which can be accumulated before jvmquake takes action. Meanwhile, there is no heap dump generated when dump threshold is greater than kill threshold.
0.4.0
celeborn.worker.jvmQuake.enabled
false
false
When true, Celeborn worker will start the jvm quake to monitor of gc behavior, which enables early detection of memory management issues and facilitates fast failure.
0.4.0
celeborn.worker.jvmQuake.exitCode
502
false
The exit code of system kill for the maximum GC 'deficit' during worker jvm quake.
0.4.0
celeborn.worker.jvmQuake.kill.threshold
60s
false
The threshold of system kill for the maximum GC 'deficit' which can be accumulated before jvmquake takes action.
0.4.0
celeborn.worker.jvmQuake.runtimeWeight
5.0
false
The factor by which to multiply running JVM time, when weighing it against GCing time. 'Deficit' is accumulated as
gc_time - runtime * runtime_weight
, and is compared against threshold to determine whether to take action.
0.4.0
celeborn.worker.memoryFileStorage.evict.aggressiveMode.enabled
false
false
If this set to true, memory shuffle files will be evicted when worker is in PAUSED state. If the worker's offheap memory is not ample, set this to true and decrease
celeborn.worker.directMemoryRatioForMemoryFileStorage
will be helpful.
0.5.1
celeborn.worker.memoryFileStorage.evict.ratio
0.5
false
If memory shuffle storage usage rate is above this config, the memory storage shuffle files will evict to free memory.
0.5.1
celeborn.worker.memoryFileStorage.maxFileSize
8MB
false
Max size for a memory storage file. It must be less than 2GB.
0.5.0
celeborn.worker.monitor.disk.check.interval
30s
false
Intervals between device monitor to check disk.
0.3.0
celeborn.worker.monitor.disk.checkInterval
celeborn.worker.monitor.disk.check.timeout
30s
false
Timeout time for worker check device status.
0.3.0
celeborn.worker.disk.check.timeout
celeborn.worker.monitor.disk.checklist
readwrite,diskusage
false
Monitor type for disk, available items are: iohang, readwrite and diskusage.
0.2.0
celeborn.worker.monitor.disk.enabled
true
false
When true, worker will monitor device and report to master.
0.3.0
celeborn.worker.monitor.disk.notifyError.expireTimeout
10m
false
The expire timeout of non-critical device error. Only notify critical error when the number of non-critical errors for a period of time exceeds threshold.
0.3.0
celeborn.worker.monitor.disk.notifyError.threshold
64
false
Device monitor will only notify critical error once the accumulated valid non-critical error number exceeding this threshold.
0.3.0
celeborn.worker.monitor.disk.sys.block.dir
/sys/block
false
The directory where linux file block information is stored.
0.2.0
celeborn.worker.monitor.memory.check.interval
10ms
false
Interval of worker direct memory checking.
0.3.0
celeborn.worker.memory.checkInterval
celeborn.worker.monitor.memory.report.interval
10s
false
Interval of worker direct memory tracker reporting to log.
0.3.0
celeborn.worker.memory.reportInterval
celeborn.worker.monitor.memory.trimChannelWaitInterval
1s
false
Wait time after worker trigger channel to trim cache.
0.3.0
celeborn.worker.monitor.memory.trimFlushWaitInterval
1s
false
Wait time after worker trigger StorageManger to flush data.
0.3.0
celeborn.worker.monitor.pinnedMemory.check.enabled
true
false
If true, MemoryManager will check worker should resume by pinned memory used.
0.6.0
celeborn.worker.monitor.pinnedMemory.check.interval
10s
false
Interval of worker direct pinned memory checking, only takes effect when celeborn.network.memory.allocator.pooled and celeborn.worker.monitor.pinnedMemory.check.enabled are enabled.
0.6.0
celeborn.worker.monitor.pinnedMemory.resumeKeepTime
1s
false
Time of worker to stay in resume state after resumeByPinnedMemory
0.6.0
celeborn.worker.partition.initial.readBuffersMax
1024
false
Max number of initial read buffers
0.3.0
celeborn.worker.partition.initial.readBuffersMin
false
Min number of initial read buffers
0.3.0
celeborn.worker.partitionSorter.directMemoryRatioThreshold
0.1
false
Max ratio of partition sorter's memory for sorting, when reserved memory is higher than max partition sorter memory, partition sorter will stop sorting. If this value is set to 0, partition files sorter will skip memory check and ServingState check.
0.2.0
celeborn.worker.pinnedMemoryRatioToResume
0.3
false
If pinned memory usage is less than this limit, worker will resume, only takes effect when celeborn.network.memory.allocator.pooled and celeborn.worker.monitor.pinnedMemory.check.enabled are enabled
0.6.0
celeborn.worker.push.heartbeat.enabled
false
false
enable the heartbeat from worker to client when pushing data
0.3.0
celeborn.worker.push.io.threads

false
Netty IO thread number of worker to handle client push data. The default threads number is the number of flush thread.
0.2.0
celeborn.worker.push.port
false
Server port for Worker to receive push data request from ShuffleClient.
0.2.0
celeborn.worker.pushdata.mergeBuffer.enabled
false
false
enable merge low utilization push data's body buffer before write
0.6.2
celeborn.worker.readBuffer.allocationWait
50ms
false
The time to wait when buffer dispatcher can not allocate a buffer.
0.3.0
celeborn.worker.readBuffer.processTimeout
600s
false
Timeout for buffer dispatcher to process a read buffer request.
0.6.2
celeborn.worker.readBuffer.target.changeThreshold
1mb
false
The target ratio for pre read memory usage.
0.3.0
celeborn.worker.readBuffer.target.ratio
0.9
false
The target ratio for read ahead buffer's memory usage.
0.3.0
celeborn.worker.readBuffer.target.updateInterval
100ms
false
The interval for memory manager to calculate new read buffer's target memory.
0.3.0
celeborn.worker.readBuffer.toTriggerReadMin
32
false
Min buffers count for map data partition to trigger read.
0.3.0
celeborn.worker.register.timeout
180s
false
Worker register timeout.
0.2.0
celeborn.worker.replicate.fastFail.duration
60s
false
If a replicate request not replied during the duration, worker will mark the replicate data request as failed. It's recommended to set at least
240s
when
HDFS
is enabled in
celeborn.storage.availableTypes
0.2.0
celeborn.worker.replicate.io.threads

false
Netty IO thread number of worker to replicate shuffle data. The default threads number is the number of flush thread.
0.2.0
celeborn.worker.replicate.port
false
Server port for Worker to receive replicate data request from other Workers.
0.2.0
celeborn.worker.replicate.randomConnection.enabled
true
false
Whether worker will create random connection to peer when replicate data. When false, worker tend to reuse the same cached TransportClient to a specific replicate worker; when true, worker tend to use different cached TransportClient. Netty will use the same thread to serve the same connection, so with more connections replicate server can leverage more netty threads
0.2.1
celeborn.worker.replicate.threads
64
false
Thread number of worker to replicate shuffle data.
0.2.0
celeborn.worker.reuse.hdfs.outputStream.enabled
false
false
Whether to enable reuse output stream on hdfs.
0.7.0
celeborn.worker.rpc.port
false
Server port for Worker to receive RPC request.
0.2.0
celeborn.worker.shuffle.partitionSplit.enabled
true
false
enable the partition split on worker side
0.3.0
celeborn.worker.partition.split.enabled
celeborn.worker.shuffle.partitionSplit.max
2g
false
Specify the maximum partition size for splitting, and ensure that individual partition files are always smaller than this limit.
0.3.0
celeborn.worker.shuffle.partitionSplit.min
1m
false
Min size for a partition to split
0.3.0
celeborn.shuffle.partitionSplit.min
celeborn.worker.sortPartition.indexCache.expire
180s
false
PartitionSorter's cache item expire time.
0.4.0
celeborn.worker.sortPartition.indexCache.maxWeight
100000
false
PartitionSorter's cache max weight for index buffer.
0.4.0
celeborn.worker.sortPartition.prefetch.enabled
true
false
When true, partition sorter will prefetch the original partition files to page cache and reserve memory configured by
celeborn.worker.sortPartition.reservedMemoryPerPartition
to allocate a block of memory for prefetching while sorting a shuffle file off-heap with page cache for non-hdfs files. Otherwise, partition sorter seeks to position of each block and does not prefetch for non-hdfs files.
0.5.0
celeborn.worker.sortPartition.reservedMemoryPerPartition
1mb
false
Reserved memory when sorting a shuffle file off-heap.
0.3.0
celeborn.worker.partitionSorter.reservedMemoryPerPartition
celeborn.worker.sortPartition.sortTimeLogThreshold

false
When sort time exceeds this threshold, log the file id and sort duration. Set to 0 to disable logging.
0.6.2
celeborn.worker.sortPartition.threads

false
PartitionSorter's thread counts. It's recommended to set at least
64
when
HDFS
is enabled in
celeborn.storage.availableTypes
0.3.0
celeborn.worker.partitionSorter.threads
celeborn.worker.sortPartition.timeout
220s
false
Timeout for a shuffle file to sort.
0.3.0
celeborn.worker.partitionSorter.sort.timeout
celeborn.worker.storage.checkDirsEmpty.maxRetries
false
The number of retries for a worker to check if the working directory is cleaned up before registering with the master.
0.3.0
celeborn.worker.disk.checkFileClean.maxRetries
celeborn.worker.storage.checkDirsEmpty.timeout
1000ms
false
The wait time per retry for a worker to check if the working directory is cleaned up before registering with the master.
0.3.0
celeborn.worker.disk.checkFileClean.timeout
celeborn.worker.storage.dirs

false
Directory list to store shuffle data. It's recommended to configure one directory on each disk. Storage size limit can be set for each directory. For the sake of performance, there should be no more than 2 flush threads on the same disk partition if you are using HDD, and should be 8 or more flush threads on the same disk partition if you are using SSD. For example:
dir1[:capacity=][:disktype=][:flushthread=],dir2[:capacity=][:disktype=][:flushthread=]
0.2.0
celeborn.worker.storage.disk.reserve.ratio

false
Celeborn worker reserved ratio for each disk. The minimum usable size for each disk is the max space between the reserved space and the space calculate via reserved ratio.
0.3.2
celeborn.worker.storage.disk.reserve.size
5G
false
Celeborn worker reserved space for each disk.
0.3.0
celeborn.worker.disk.reserve.size
celeborn.worker.storage.expireDirs.timeout
1h
false
The timeout for a expire dirs to be deleted on disk.
0.3.2
celeborn.worker.storage.storagePolicy.createFilePolicy

false
This defined the order for creating files across available storages. Available storages options are: MEMORY,SSD,HDD,HDFS,S3,OSS
0.5.1
celeborn.worker.storage.storagePolicy.evictPolicy

false
This define the order of evict files if the storages are available. Available storages: MEMORY,SSD,HDD,HDFS,S3,OSS. Definition: StorageTypes
StorageTypes
StorageTypes. Example: MEMORY,SSD
celeborn.worker.storage.workingDir
celeborn-worker/shuffle_data
false
Worker's working dir path name.
0.3.0
celeborn.worker.workingDir
celeborn.worker.writer.close.timeout
120s
false
Timeout for a file writer to close
0.2.0
celeborn.worker.writer.create.maxAttempts
false
Retry count for a file writer to create if its creation was failed.
0.2.0
celeborn.worker.writer.create.parallel.enabled
false
false
Whether to parallelize the creation of file writer.
0.6.3
celeborn.worker.writer.create.parallel.threads

false
Thread number of worker to parallelize the creation of file writer.
0.6.3
celeborn.worker.writer.create.parallel.timeout
120s
false
Timeout for a worker to create a file writer in parallel.
0.6.3
celeborn.worker.writer.hdfs.createAuxiliaryFile.maxRetries
false
Retry count for a auxiliary file including index file and success file with HDFS storage to create if its creation was failed.
0.7.0
celeborn.worker.writer.hdfs.createAuxiliaryFile.retryWait
200ms
false
Wait interval after failure to create a auxiliary file with HDFS storage and then retry it.
0.7.0
worker.flush.reuseCopyBuffer.enabled
true
false
Whether to enable reuse copy buffer for flush. Note that this copy buffer must not be referenced again after flushing. This means that, for example, the Hdfs(Oss or S3) client will not asynchronously access this buffer after the flush method returns, otherwise data modification problems will occur.
0.6.1
Client
Key
Default
isDynamic
Description
Since
Deprecated
celeborn.client.adaptive.optimizeSkewedPartitionRead.enabled
false
false
If this is true, Celeborn will adaptively split skewed partitions instead of reading them by Spark map range. Please note that this feature requires the
Celeborn-Optimize-Skew-Partitions-spark3_3.patch
0.6.0
celeborn.client.application.heartbeatInterval
10s
false
Interval for client to send heartbeat message to master.
0.3.0
celeborn.application.heartbeatInterval
celeborn.client.application.info.provider
org.apache.celeborn.common.client.DefaultApplicationInfoProvider
false
ApplicationInfoProvider class name. Default class is
org.apache.celeborn.common.client.DefaultApplicationInfoProvider
. Optional values: org.apache.celeborn.common.identity.DefaultIdentityProvider user name and tenant id are default values or user-specific values.
0.6.1
celeborn.client.application.info.user-specific
false
User specific information for application registration, pattern is
=[,=]*
, e.g.
cluster=celeborn
0.6.1
celeborn.client.application.unregister.enabled
true
false
When true, Celeborn client will inform celeborn master the application is already shutdown during client exit, this allows the cluster to release resources immediately, resulting in resource savings.
0.3.2
celeborn.client.application.uuidSuffix.enabled
false
false
Whether to add UUID suffix for application id for unique. When
true
, add UUID suffix for unique application id. Currently, this only applies to Spark and MR.
0.6.0
celeborn.client.chunk.prefetch.enabled
false
false
Whether to enable chunk prefetch when creating CelebornInputStream.
0.5.1
celeborn.client.closeIdleConnections
true
false
Whether client will close idle connections.
0.3.0
celeborn.client.commitFiles.ignoreExcludedWorker
false
false
When true, LifecycleManager will skip workers which are in the excluded list.
0.3.0
celeborn.client.eagerlyCreateInputStream.threads
32
false
Threads count for streamCreatorPool in CelebornShuffleReader.
0.3.1
celeborn.client.excludePeerWorkerOnFailure.enabled
true
false
When true, Celeborn will exclude partition's peer worker on failure when push data to replica failed.
0.3.0
celeborn.client.excludedWorker.expireTimeout
180s
false
Timeout time for LifecycleManager to clear reserved excluded worker. Default to be 1.5 *
celeborn.master.heartbeat.worker.timeout
to cover worker heartbeat timeout check period
0.3.0
celeborn.worker.excluded.expireTimeout
celeborn.client.fetch.buffer.size
64k
false
Size of reducer partition buffer memory for shuffle reader. The fetched data will be buffered in memory before consuming. For performance consideration keep this buffer size not less than
celeborn.client.push.buffer.max.size
0.4.0
celeborn.client.fetch.dfsReadChunkSize
8m
false
Max chunk size for DfsPartitionReader.
0.3.1
celeborn.client.fetch.excludeWorkerOnFailure.enabled
false
false
Whether to enable shuffle client-side fetch exclude workers on failure.
0.3.0
celeborn.client.fetch.excludedWorker.expireTimeout

false
ShuffleClient is a static object, it will be used in the whole lifecycle of Executor, We give a expire time for excluded workers to avoid a transient worker issues.
0.3.0
celeborn.client.fetch.maxReqsInFlight
false
Amount of in-flight chunk fetch request.
0.3.0
celeborn.fetch.maxReqsInFlight
celeborn.client.fetch.maxRetriesForEachReplica
false
Max retry times of fetch chunk on each replica
0.3.0
celeborn.fetch.maxRetriesForEachReplica,celeborn.fetch.maxRetries
celeborn.client.fetch.pollChunk.wait
500ms
false
The waiting time for shuffle client to read the empty chunk on the work side.when there are many empty chunk in the shuffle partition of a small task,the current value can be set small to avoid long waiting times and the illusion of thetask getting stuck
0.6.1
celeborn.client.fetch.timeout
600s
false
Timeout for a task to open stream and fetch chunk.
0.3.0
celeborn.fetch.timeout
celeborn.client.flink.compression.enabled
true
false
Whether to compress data in Flink plugin.
0.3.0
remote-shuffle.job.enable-data-compression
celeborn.client.flink.inputGate.concurrentReadings
2147483647
false
Max concurrent reading channels for a input gate.
0.3.0
remote-shuffle.job.concurrent-readings-per-gate
celeborn.client.flink.inputGate.memory
32m
false
Memory reserved for a input gate.
0.3.0
remote-shuffle.job.memory-per-gate
celeborn.client.flink.inputGate.supportFloatingBuffer
true
false
Whether to support floating buffer in Flink input gates.
0.3.0
remote-shuffle.job.support-floating-buffer-per-input-gate
celeborn.client.flink.metrics.scope.shuffle
.taskmanager.....
false
Defines the scope format string that is applied to all metrics scoped to a shuffle. Only effective when a identifier-based reporter is configured
0.6.0
celeborn.client.flink.open.stream.threads

false
Thread number of flink shuffle client to open buffer stream. Default value is Runtime.getRuntime.availableProcessors.
0.6.1
celeborn.client.flink.partitionConnectionException.enabled
false
false
If enabled,
org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException
would be thrown when RemoteBufferStreamReader finds that the current exception is about connection failure, then Flink can be aware of the lost Celeborn server side nodes and be able to re-compute affected data.
0.6.0
celeborn.client.flink.resultPartition.memory
64m
false
Memory reserved for a result partition.
0.3.0
remote-shuffle.job.memory-per-partition
celeborn.client.flink.resultPartition.supportFloatingBuffer
true
false
Whether to support floating buffer for result partitions.
0.3.0
remote-shuffle.job.support-floating-buffer-per-output-gate
celeborn.client.flink.shuffle.fallback.policy
AUTO
false
Celeborn supports the following kind of fallback policies. 1. ALWAYS: always use flink built-in shuffle implementation; 2. AUTO: prefer to use celeborn shuffle implementation, and fallback to use flink built-in shuffle implementation based on certain factors, e.g. availability of enough workers and quota; 3. NEVER: always use celeborn shuffle implementation, and fail fast when it it is concluded that fallback is required based on factors above.
0.6.0
celeborn.client.inputStream.creation.window
16
false
Window size that CelebornShuffleReader pre-creates CelebornInputStreams, for coalesced scenario where multiple Partitions are read
0.5.1
celeborn.client.mr.pushData.max
32m
false
Max size for a push data sent from mr client.
0.4.0
celeborn.client.partition.reader.checkpoint.enabled
false
false
Whether or not checkpoint reads when re-creating a partition reader. Setting to true minimizes the amount of unnecessary reads during partition read retries
0.6.0
celeborn.client.partition.reader.waitLog.threshold
60s
false
The threshold in milliseconds for logging partition read wait time. Log messages will be generated when wait time exceeds multiples of this threshold.
0.6.2
celeborn.client.push.buffer.initial.size
8k
false
0.3.0
celeborn.push.buffer.initial.size
celeborn.client.push.buffer.max.size
64k
false
Max size of reducer partition buffer memory for shuffle hash writer. The pushed data will be buffered in memory before sending to Celeborn worker. For performance consideration keep this buffer size higher than 32K. Example: If reducer amount is 2000, buffer size is 64K, then each task will consume up to
64KiB * 2000 = 125MiB
heap memory.
0.3.0
celeborn.push.buffer.max.size
celeborn.client.push.excludeWorkerOnFailure.enabled
false
false
Whether to enable shuffle client-side push exclude workers on failures.
0.3.0
celeborn.client.push.limit.inFlight.sleepInterval
50ms
false
Sleep interval when check netty in-flight requests to be done.
0.3.0
celeborn.push.limit.inFlight.sleepInterval
celeborn.client.push.limit.inFlight.timeout

false
Timeout for netty in-flight requests to be done. Default value should be
celeborn.client.push.timeout * 2
0.3.0
celeborn.push.limit.inFlight.timeout
celeborn.client.push.limit.strategy
SIMPLE
false
The strategy used to control the push speed. Valid strategies are SIMPLE and SLOWSTART. The SLOWSTART strategy usually works with congestion control mechanism on the worker side.
0.3.0
celeborn.client.push.maxBytesSizeInFlight.enabled
false
false
Whether
celeborn.client.push.maxBytesSizeInFlight.perWorker/total
is enabled
0.6.1
celeborn.push.maxBytesSizeInFlight.enabled
celeborn.client.push.maxBytesSizeInFlight.perWorker

false
Bytes size of Netty in-flight requests per worker. Default max memory of in flight requests per worker is
celeborn.client.push.maxReqsInFlight.perWorker
celeborn.client.push.buffer.max.size
* compression ratio(1 in worst case): 64KiB * 32 = 2MiB. This is an alternative to
celeborn.client.push.maxReqsInFlight.perWorker
in cases where records are huge and exceed the maximum memory.
0.6.1
celeborn.client.push.maxBytesSizeInFlight.total

false
Bytes size of total Netty in-flight requests. The maximum memory is
celeborn.client.push.maxReqsInFlight.total
celeborn.client.push.buffer.max.size
* compression ratio(1 in worst case): 64KiB * 256 = 16MiB. This is an addition to
celeborn.client.push.maxReqsInFlight.total
in cases where records are huge and exceed the maximum memory.
0.6.1
celeborn.client.push.maxReqsInFlight.perWorker
32
false
Amount of Netty in-flight requests per worker. Default max memory of in flight requests per worker is
celeborn.client.push.maxReqsInFlight.perWorker
celeborn.client.push.buffer.max.size
* compression ratio(1 in worst case): 64KiB * 32 = 2MiB. The maximum memory will not exceed
celeborn.client.push.maxReqsInFlight.total
0.3.0
celeborn.client.push.maxReqsInFlight.total
256
false
Amount of total Netty in-flight requests. The maximum memory is
celeborn.client.push.maxReqsInFlight.total
celeborn.client.push.buffer.max.size
* compression ratio(1 in worst case): 64KiB * 256 = 16MiB
0.3.0
celeborn.push.maxReqsInFlight
celeborn.client.push.queue.capacity
512
false
Push buffer queue size for a task. The maximum memory is
celeborn.client.push.buffer.max.size
celeborn.client.push.queue.capacity
, default: 64KiB * 512 = 32MiB
0.3.0
celeborn.push.queue.capacity
celeborn.client.push.replicate.enabled
false
false
When true, Celeborn worker will replicate shuffle data to another Celeborn worker asynchronously to ensure the pushed shuffle data won't be lost after the node failure. It's recommended to set
false
when
HDFS
is enabled in
celeborn.storage.availableTypes
0.3.0
celeborn.push.replicate.enabled
celeborn.client.push.retry.threads
false
Thread number to process shuffle re-send push data requests.
0.3.0
celeborn.push.retry.threads
celeborn.client.push.revive.batchSize
2048
false
Max number of partitions in one Revive request.
0.3.0
celeborn.client.push.revive.interval
100ms
false
Interval for client to trigger Revive to LifecycleManager. The number of partitions in one Revive request is
celeborn.client.push.revive.batchSize
0.3.0
celeborn.client.push.revive.maxRetries
false
Max retry times for reviving when celeborn push data failed.
0.3.0
celeborn.client.push.sendBufferPool.checkExpireInterval
30s
false
Interval to check expire for send buffer pool. If the pool has been idle for more than
celeborn.client.push.sendBufferPool.expireTimeout
, the pooled send buffers and push tasks will be cleaned up.
0.3.1
celeborn.client.push.sendBufferPool.expireTimeout
60s
false
Timeout before clean up SendBufferPool. If SendBufferPool is idle for more than this time, the send buffers and push tasks will be cleaned up.
0.3.1
celeborn.client.push.slowStart.initialSleepTime
500ms
false
The initial sleep time if the current max in flight requests is 0
0.3.0
celeborn.client.push.slowStart.maxSleepTime
2s
false
If celeborn.client.push.limit.strategy is set to SLOWSTART, push side will take a sleep strategy for each batch of requests, this controls the max sleep time if the max in flight requests limit is 1 for a long time
0.3.0
celeborn.client.push.sort.randomizePartitionId.enabled
false
false
Whether to randomize partitionId in push sorter. If true, partitionId will be randomized when sort data to avoid skew when push to worker
0.3.0
celeborn.push.sort.randomizePartitionId.enabled
celeborn.client.push.stageEnd.timeout
.io.connectionTimeout>
false
Timeout for waiting StageEnd. During this process, there are
celeborn.client.requestCommitFiles.maxRetries
times for retry opportunities for committing files and 1 times for releasing slots request. User can customize this value according to your setting. By default, the value is the max timeout value
celeborn..io.connectionTimeout
0.3.0
celeborn.push.stageEnd.timeout
celeborn.client.push.takeTaskMaxWaitAttempts
false
Max wait times if no task available to push to worker.
0.3.0
celeborn.client.push.takeTaskWaitInterval
50ms
false
Wait interval if no task available to push to worker.
0.3.0
celeborn.client.push.timeout
120s
false
Timeout for a task to push data rpc message. This value should better be more than twice of
celeborn..push.timeoutCheck.interval
0.3.0
celeborn.push.data.timeout
celeborn.client.readLocalShuffleFile.enabled
false
false
Enable read local shuffle file for clusters that co-deployed with yarn node manager.
0.3.1
celeborn.client.readLocalShuffleFile.threads
false
Threads count for read local shuffle file.
0.3.1
celeborn.client.registerShuffle.maxRetries
false
Max retry times for client to register shuffle.
0.3.0
celeborn.shuffle.register.maxRetries
celeborn.client.registerShuffle.retryWait
3s
false
Wait time before next retry if register shuffle failed.
0.3.0
celeborn.shuffle.register.retryWait
celeborn.client.requestCommitFiles.maxRetries
false
Max retry times for requestCommitFiles RPC.
0.3.0
celeborn.client.requestCommitFiles.retryWait
10s
false
Wait time before next retry if requestCommitFiles RPC failed.
0.6.3
celeborn.client.reserveSlots.maxRetries
false
Max retry times for client to reserve slots.
0.3.0
celeborn.slots.reserve.maxRetries
celeborn.client.reserveSlots.rackaware.enabled
false
false
Whether need to place different replicates on different racks when allocating slots.
0.3.1
celeborn.client.reserveSlots.rackware.enabled
celeborn.client.reserveSlots.retryWait
3s
false
Wait time before next retry if reserve slots failed.
0.3.0
celeborn.slots.reserve.retryWait
celeborn.client.rpc.cache.concurrencyLevel
32
false
The number of write locks to update rpc cache.
0.3.0
celeborn.rpc.cache.concurrencyLevel
celeborn.client.rpc.cache.expireTime
15s
false
The time before a cache item is removed.
0.3.0
celeborn.rpc.cache.expireTime
celeborn.client.rpc.cache.size
256
false
The max cache items count for rpc cache.
0.3.0
celeborn.rpc.cache.size
celeborn.client.rpc.commitFiles.askTimeout

false
Timeout for CommitHandler commit files.
0.4.1
celeborn.client.rpc.getReducerFileGroup.askTimeout

false
Timeout for ask operations during getting reducer file group information. During this process, there are
celeborn.client.requestCommitFiles.maxRetries
times for retry opportunities for committing files and 1 times for releasing slots request. User can customize this value according to your setting.
0.2.0
celeborn.client.rpc.maxRetries
false
Max RPC retry times in client.
0.3.2
celeborn.client.rpc.registerShuffle.askTimeout

false
Timeout for ask operations during register shuffle. During this process, there are two times for retry opportunities for requesting slots, one request for establishing a connection with Worker and
celeborn.client.reserveSlots.maxRetries
times for retry opportunities for reserving slots. User can customize this value according to your setting.
0.3.0
celeborn.rpc.registerShuffle.askTimeout
celeborn.client.rpc.requestPartition.askTimeout

false
Timeout for ask operations during requesting change partition location, such as reviving or splitting partition. During this process, there are
celeborn.client.reserveSlots.maxRetries
times for retry opportunities for reserving slots. User can customize this value according to your setting.
0.2.0
celeborn.client.rpc.reserveSlots.askTimeout

false
Timeout for LifecycleManager request reserve slots.
0.3.0
celeborn.client.rpc.retryWait
1s
false
Client-specified time to wait before next retry on RpcTimeoutException.
0.5.4
celeborn.client.rpc.shared.threads
16
false
Number of shared rpc threads in LifecycleManager.
0.3.2
celeborn.client.shuffle.batchHandleChangePartition.interval
100ms
false
Interval for LifecycleManager to schedule handling change partition requests in batch.
0.3.0
celeborn.shuffle.batchHandleChangePartition.interval
celeborn.client.shuffle.batchHandleChangePartition.partitionBuckets
256
false
Max number of change partition requests which can be concurrently processed.
0.5.0
celeborn.client.shuffle.batchHandleChangePartition.threads
false
Threads number for LifecycleManager to handle change partition request in batch.
0.3.0
celeborn.shuffle.batchHandleChangePartition.threads
celeborn.client.shuffle.batchHandleCommitPartition.interval
5s
false
Interval for LifecycleManager to schedule handling commit partition requests in batch.
0.3.0
celeborn.shuffle.batchHandleCommitPartition.interval
celeborn.client.shuffle.batchHandleCommitPartition.threads
false
Threads number for LifecycleManager to handle commit partition request in batch.
0.3.0
celeborn.shuffle.batchHandleCommitPartition.threads
celeborn.client.shuffle.batchHandleReleasePartition.interval
5s
false
Interval for LifecycleManager to schedule handling release partition requests in batch.
0.3.0
celeborn.client.shuffle.batchHandleReleasePartition.threads
false
Threads number for LifecycleManager to handle release partition request in batch.
0.3.0
celeborn.client.shuffle.batchHandleRemoveExpiredShuffles.enabled
false
false
Whether to batch remove expired shuffles. This is an optimization switch on removing expired shuffles.
0.6.0
celeborn.client.shuffle.checkWorker.enabled
true
false
When true, before registering shuffle, LifecycleManager should check if current cluster have available workers, if cluster don't have available workers, fallback to default shuffle.
0.5.0
celeborn.client.spark.shuffle.checkWorker.enabled
celeborn.client.shuffle.compression.codec
LZ4
false
The codec used to compress shuffle data. By default, Celeborn provides three codecs:
lz4
zstd
none
none
means that shuffle compression is disabled. Since Flink version 1.16, zstd is supported for Flink shuffle client.
0.3.0
celeborn.shuffle.compression.codec,remote-shuffle.job.compression.codec
celeborn.client.shuffle.compression.zstd.level
false
Compression level for Zstd compression codec, its value should be an integer between -5 and 22. Increasing the compression level will result in better compression at the expense of more CPU and memory.
0.3.0
celeborn.shuffle.compression.zstd.level
celeborn.client.shuffle.decompression.lz4.xxhash.instance

false
Decompression XXHash instance for Lz4. Available options: JNI, JAVASAFE, JAVAUNSAFE.
0.3.2
celeborn.client.shuffle.dynamicResourceEnabled
false
false
When enabled, the ChangePartitionManager will obtain candidate workers from the availableWorkers pool during heartbeats when worker resource change.
0.6.0
celeborn.client.shuffle.dynamicResourceFactor
0.5
false
The ChangePartitionManager will check whether (unavailable workers / shuffle allocated workers) is more than the factor before obtaining candidate workers from the requestSlots RPC response when
celeborn.client.shuffle.dynamicResourceEnabled
set true
0.6.0
celeborn.client.shuffle.expired.checkInterval
60s
false
Interval for client to check expired shuffles.
0.3.0
celeborn.shuffle.expired.checkInterval
celeborn.client.shuffle.integrityCheck.enabled
false
false
When
true
, enables end-to-end integrity checks for Spark workloads.
0.6.1
celeborn.client.shuffle.manager.port
false
Port used by the LifecycleManager on the Driver.
0.3.0
celeborn.shuffle.manager.port
celeborn.client.shuffle.partition.type
REDUCE
false
Type of shuffle's partition.
0.3.0
celeborn.shuffle.partition.type
celeborn.client.shuffle.partitionSplit.mode
SOFT
false
soft: the shuffle file size might be larger than split threshold. hard: the shuffle file size will be limited to split threshold.
0.3.0
celeborn.shuffle.partitionSplit.mode
celeborn.client.shuffle.partitionSplit.threshold
1G
false
Shuffle file size threshold, if file size exceeds this, trigger split.
0.3.0
celeborn.shuffle.partitionSplit.threshold
celeborn.client.shuffle.rangeReadFilter.enabled
false
false
If a spark application have skewed partition, this value can set to true to improve performance.
0.2.0
celeborn.shuffle.rangeReadFilter.enabled
celeborn.client.shuffle.register.filterExcludedWorker.enabled
false
false
Whether to filter excluded worker when register shuffle.
0.4.0
celeborn.client.shuffle.reviseLostShuffles.enabled
false
false
Whether to revise lost shuffles.
0.6.0
celeborn.client.shuffleDataLostOnUnknownWorker.enabled
false
false
Whether to mark shuffle data lost when unknown worker is detected.
0.6.3
celeborn.client.slot.assign.maxWorkers
10000
false
Max workers that slots of one shuffle can be allocated on. Will choose the smaller positive one from Master side and Client side, see
celeborn.master.slot.assign.maxWorkers
0.3.1
celeborn.client.spark.fetch.cleanFailedShuffle
false
false
whether to clean those disk space occupied by shuffles which cannot be fetched
0.6.0
celeborn.client.spark.fetch.cleanFailedShuffleInterval
1s
false
the interval to clean the failed-to-fetch shuffle files, only valid when celeborn.client.spark.fetch.cleanFailedShuffle is enabled
0.6.0
celeborn.client.spark.push.dynamicWriteMode.enabled
false
false
Whether to dynamically switch push write mode based on conditions.If true, shuffle mode will be only determined by partition count
0.5.0
celeborn.client.spark.push.dynamicWriteMode.partitionNum.threshold
2000
false
Threshold of shuffle partition number for dynamically switching push writer mode. When the shuffle partition number is greater than this value, use the sort-based shuffle writer for memory efficiency; otherwise use the hash-based shuffle writer for speed. This configuration only takes effect when celeborn.client.spark.push.dynamicWriteMode.enabled is true.
0.5.0
celeborn.client.spark.push.sort.memory.maxMemoryFactor
0.4
false
the max portion of executor memory which can be used for SortBasedWriter buffer (only valid when celeborn.client.spark.push.sort.memory.useAdaptiveThreshold is enabled
0.5.0
celeborn.client.spark.push.sort.memory.smallPushTolerateFactor
0.2
false
Only be in effect when celeborn.client.spark.push.sort.memory.useAdaptiveThreshold is turned on. The larger this value is, the more aggressive Celeborn will enlarge the Sort-based Shuffle writer's memory threshold. Specifically, this config controls when to enlarge the sort shuffle writer's memory threshold. With N bytes data in memory and V as the value of this config, if the number of pushes, C, when using sort based shuffle writer C >= (1 + V) * C' where C' is the number of pushes if we were using hash based writer, we will enlarge the memory threshold by 2X.
0.5.0
celeborn.client.spark.push.sort.memory.threshold
64m
false
When SortBasedPusher use memory over the threshold, will trigger push data.
0.3.0
celeborn.push.sortMemory.threshold
celeborn.client.spark.push.sort.memory.useAdaptiveThreshold
false
false
Adaptively adjust sort-based shuffle writer's memory threshold
0.5.0
celeborn.client.spark.push.unsafeRow.fastWrite.enabled
true
false
This is Celeborn's optimization on UnsafeRow for Spark and it's true by default. If you have changed UnsafeRow's memory layout set this to false.
0.2.2
celeborn.client.spark.shuffle.fallback.numPartitionsThreshold
2147483647
false
Celeborn will only accept shuffle of partition number lower than this configuration value. This configuration only takes effect when
celeborn.client.spark.shuffle.fallback.policy
is
AUTO
0.5.0
celeborn.shuffle.forceFallback.numPartitionsThreshold,celeborn.client.spark.shuffle.forceFallback.numPartitionsThreshold
celeborn.client.spark.shuffle.fallback.policy
AUTO
false
Celeborn supports the following kind of fallback policies. 1. ALWAYS: always use spark built-in shuffle implementation; 2. AUTO: prefer to use celeborn shuffle implementation, and fallback to use spark built-in shuffle implementation based on certain factors, e.g. availability of enough workers and quota, shuffle partition number; 3. NEVER: always use celeborn shuffle implementation, and fail fast when it it is concluded that fallback is required based on factors above.
0.5.0
celeborn.client.spark.shuffle.forceFallback.enabled
false
false
Always use spark built-in shuffle implementation. This configuration is deprecated, consider configuring
celeborn.client.spark.shuffle.fallback.policy
instead.
0.3.0
celeborn.shuffle.forceFallback.enabled
celeborn.client.spark.shuffle.getReducerFileGroup.broadcast.enabled
false
false
Whether to leverage Spark broadcast mechanism to send the GetReducerFileGroupResponse. If the response size is large and Spark executor number is large, the Spark driver network may be exhausted because each executor will pull the response from the driver. With broadcasting GetReducerFileGroupResponse, it prevents the driver from being the bottleneck in sending out multiple copies of the GetReducerFileGroupResponse (one per executor).
0.6.0
celeborn.client.spark.shuffle.getReducerFileGroup.broadcast.miniSize
512k
false
The size at which we use Broadcast to send the GetReducerFileGroupResponse to the executors.
0.6.0
celeborn.client.spark.shuffle.writer
HASH
false
Celeborn supports the following kind of shuffle writers. 1. hash: hash-based shuffle writer works fine when shuffle partition count is normal; 2. sort: sort-based shuffle writer works fine when memory pressure is high or shuffle partition count is huge. This configuration only takes effect when celeborn.client.spark.push.dynamicWriteMode.enabled is false.
0.3.0
celeborn.shuffle.writer
celeborn.client.spark.stageRerun.enabled
true
false
Whether to enable stage rerun. If true, client throws FetchFailedException instead of CelebornIOException.
0.4.0
celeborn.client.spark.fetch.throwsFetchFailure
celeborn.identity.provider
org.apache.celeborn.common.identity.DefaultIdentityProvider
false
IdentityProvider class name. Default class is
org.apache.celeborn.common.identity.DefaultIdentityProvider
. Optional values: org.apache.celeborn.common.identity.HadoopBasedIdentityProvider user name will be obtained by UserGroupInformation.getUserName; org.apache.celeborn.common.identity.DefaultIdentityProvider user name and tenant id are default values or user-specific values.
0.6.0
celeborn.quota.identity.provider
celeborn.identity.user-specific.tenant
default
false
Tenant id if celeborn.identity.provider is org.apache.celeborn.common.identity.DefaultIdentityProvider.
0.6.0
celeborn.quota.identity.user-specific.tenant
celeborn.identity.user-specific.userName
default
false
User name if celeborn.identity.provider is org.apache.celeborn.common.identity.DefaultIdentityProvider.
0.6.0
celeborn.quota.identity.user-specific.userName
celeborn.master.endpoints
:9097
false
Endpoints of master nodes for celeborn clients to connect. Client uses resolver provided by celeborn.master.endpoints.resolver to resolve the master endpoints. By default Celeborn uses
org.apache.celeborn.common.client.StaticMasterEndpointResolver
which take static master endpoints as input. Allowed pattern:
:[,:]*
, e.g.
clb1:9097,clb2:9098,clb3:9099
. If the port is omitted, 9097 will be used. If the master endpoints are not static then users can pass custom resolver implementation to discover master endpoints actively using celeborn.master.endpoints.resolver.
0.2.0
celeborn.master.endpoints.resolver
org.apache.celeborn.common.client.StaticMasterEndpointResolver
false
Resolver class that can be used for discovering and updating the master endpoints. This allows users to provide a custom master endpoint resolver implementation. This is useful in environments where the master nodes might change due to scaling operations or infrastructure updates. Clients need to ensure that provided resolver class should be present in the classpath.
0.5.2
celeborn.quota.enabled
true
false
When Master side sets to true, the master will enable to check the quota via QuotaManager. When Client side sets to true, LifecycleManager will request Master side to check whether the current user has enough quota before registration of shuffle. Fallback to the default shuffle service when Master side checks that there is no enough quota for current user.
0.2.0
celeborn.quota.interruptShuffle.enabled
false
false
Whether to enable interrupt shuffle when quota exceeds.
0.6.0
celeborn.storage.availableTypes
HDD
false
Enabled storages. Available options: MEMORY,HDD,SSD,HDFS,S3,OSS. Note: HDD and SSD would be treated as identical.
0.3.0
celeborn.storage.activeTypes
celeborn.storage.hdfs.dir

false
HDFS base directory for Celeborn to store shuffle data.
0.2.0
celeborn.storage.oss.access.key

false
OSS access key for Celeborn to store shuffle data.
0.6.0
celeborn.storage.oss.dir

false
OSS base directory for Celeborn to store shuffle data.
0.6.0
celeborn.storage.oss.endpoint

false
OSS endpoint for Celeborn to store shuffle data.
0.6.0
celeborn.storage.oss.ignore.credentials
true
false
Whether to skip oss credentials, disable this config to support jindo sdk .
0.6.0
celeborn.storage.oss.secret.key

false
OSS secret key for Celeborn to store shuffle data.
0.6.0
celeborn.storage.s3.dir

false
S3 base directory for Celeborn to store shuffle data.
0.6.0
celeborn.storage.s3.endpoint.region

false
S3 endpoint for Celeborn to store shuffle data.
0.6.0
celeborn.tags.tagsExpr
true
Expression to filter workers by tags. The expression is a comma-separated list of tags. The expression is evaluated as a logical AND of all tags. For example,
prod,high-io
filters workers that have both the
prod
and
high-io
tags.
0.6.0
Quota
Key
Default
isDynamic
Description
Since
Deprecated
celeborn.quota.cluster.diskBytesWritten
9223372036854775807b
true
Cluster level quota dynamic configuration for written disk bytes.
0.6.0
celeborn.quota.cluster.diskFileCount
9223372036854775807
true
Cluster level quota dynamic configuration for written disk file count.
0.6.0
celeborn.quota.cluster.enabled
true
false
Whether to enable cluster-level quota.
0.6.0
celeborn.quota.cluster.hdfsBytesWritten
9223372036854775807b
true
Cluster level quota dynamic configuration for written hdfs bytes.
0.6.0
celeborn.quota.cluster.hdfsFileCount
9223372036854775807
true
Cluster level quota dynamic configuration for written hdfs file count.
0.6.0
celeborn.quota.enabled
true
false
When Master side sets to true, the master will enable to check the quota via QuotaManager. When Client side sets to true, LifecycleManager will request Master side to check whether the current user has enough quota before registration of shuffle. Fallback to the default shuffle service when Master side checks that there is no enough quota for current user.
0.2.0
celeborn.quota.interruptShuffle.enabled
false
false
Whether to enable interrupt shuffle when quota exceeds.
0.6.0
celeborn.quota.tenant.diskBytesWritten
9223372036854775807b
true
Tenant level quota dynamic configuration for written disk bytes.
0.5.0
celeborn.quota.tenant.diskFileCount
9223372036854775807
true
Tenant level quota dynamic configuration for written disk file count.
0.5.0
celeborn.quota.tenant.enabled
true
false
Whether to enable tenant-level quota.
0.6.0
celeborn.quota.tenant.hdfsBytesWritten
9223372036854775807b
true
Tenant level quota dynamic configuration for written hdfs bytes.
0.5.0
celeborn.quota.tenant.hdfsFileCount
9223372036854775807
true
Tenant level quota dynamic configuration for written hdfs file count.
0.5.0
celeborn.quota.user.diskBytesWritten
9223372036854775807b
true
User level quota dynamic configuration for written disk bytes.
0.6.0
celeborn.quota.user.diskFileCount
9223372036854775807
true
User level quota dynamic configuration for written disk file count.
0.6.0
celeborn.quota.user.enabled
true
false
Whether to enable user-level quota.
0.6.0
celeborn.quota.user.hdfsBytesWritten
9223372036854775807b
true
User level quota dynamic configuration for written hdfs bytes.
0.6.0
celeborn.quota.user.hdfsFileCount
9223372036854775807
true
User level quota dynamic configuration for written hdfs file count.
0.6.0
Network
The various transport modules which can be configured are:
Module
Parent Module
Description
rpc_app
rpc
Configure control plane RPC environment used by Celeborn within the application. For backward compatibility, supports fallback to
rpc
parent module for missing configuration.
Note, this is for RPC environment - see below for other transport modules
rpc_service
rpc
Configure control plane RPC environment when communicating with Celeborn service hosts. This includes all RPC communication from application to Celeborn Master/Workers, as well as between Celeborn masters/workers themselves.
For backward compatibility, supports fallback to
rpc
parent module for missing configuration.
As with
rpc_app
, this is only for RPC environment see below for other transport modules.
rpc
Fallback parent transport module for
rpc_app
and
rpc_service
. It is advisible to use the specific transport modules while configuring -
rpc
exists primarily for backward compatibility
push
Configure transport module for handling data push at Celeborn workers
fetch
Configure transport module for handling data fetch at Celeborn workers
data
Configure transport module for handling data push and fetch at Celeborn apps
replicate
Configure transport module for handling data replication between Celeborn workers
Some network configurations might apply in specific scenarios, for example
push
module for
io.maxRetries
and
io.retryWait
in flink client. Please see the full list below for details.
Key
Default
isDynamic
Description
Since
Deprecated
celeborn..fetch.timeoutCheck.interval
5s
false
Interval for checking fetch data timeout. It only support setting
to
data
since it works for shuffle client fetch data.
0.3.0
celeborn..fetch.timeoutCheck.threads
false
Threads num for checking fetch data timeout. It only support setting
to
data
since it works for shuffle client fetch data.
0.3.0
celeborn..heartbeat.interval
60s
false
The heartbeat interval between worker and client. If setting
to
push
, it works for worker receiving push data. If setting
to
fetch
, it works for worker fetch server. If you are using the "celeborn.client.heartbeat.interval", please use the new configs for each module according to your needs or replace it with "celeborn.push.heartbeat.interval" and "celeborn.fetch.heartbeat.interval".
0.3.0
celeborn.client.heartbeat.interval
celeborn..io.backLog
false
Requested maximum length of the queue of incoming connections. Default 0 for no backlog. If setting
to
rpc_app
, works for shuffle client. If setting
to
rpc_service
, works for master or worker. If setting
to
push
, it works for worker receiving push data. If setting
to
replicate
, it works for replicate server of worker replicating data to peer worker. If setting
to
fetch
, it works for worker fetch server.
celeborn..io.clientThreads
false
Number of threads used in the client thread pool. Default to 0, which is 2x#cores. If setting
to
rpc_app
, works for shuffle client. If setting
to
rpc_service
, works for master or worker. If setting
to
data
, it works for shuffle client push and fetch data, of which default value is determined by celeborn.
.io.threads . If setting
to
replicate
, it works for replicate client of worker replicating data to peer worker.
celeborn..io.conflictAvoidChooser.enable
false
false
Whether to use conflict avoid event executor chooser in the client thread pool. If setting
to
rpc_app
, works for shuffle client. If setting
to
rpc_service
, works for master or worker. If setting
to
data
, it works for shuffle client push and fetch data. If setting
to
replicate
, it works for replicate client of worker replicating data to peer worker.
0.5.4
celeborn..io.connectTimeout

false
Socket connect timeout. If setting
to
rpc_app
, works for shuffle client. If setting
to
rpc_service
, works for master or worker. If setting
to
data
, it works for shuffle client push and fetch data. If setting
to
replicate
, it works for the replicate client of worker replicating data to peer worker.
celeborn..io.connectionTimeout

false
Connection active timeout. If setting
to
rpc_app
, works for shuffle client. If setting
to
rpc_service
, works for master or worker. If setting
to
data
, it works for shuffle client push and fetch data. If setting
to
push
, it works for worker receiving push data. If setting
to
replicate
, it works for replicate server or client of worker replicating data to peer worker. If setting
to
fetch
, it works for worker fetch server.
celeborn..io.lazyFD
true
false
Whether to initialize FileDescriptor lazily or not. If true, file descriptors are created only when data is going to be transferred. This can reduce the number of open files. If setting
to
fetch
, it works for worker fetch server.
celeborn..io.maxRetries
false
Max number of times we will try IO exceptions (such as connection timeouts) per request. If set to 0, we will not do any retries. If setting
to
data
, it works for shuffle client push and fetch data. If setting
to
replicate
, it works for replicate client of worker replicating data to peer worker. If setting
to
push
, it works for Flink shuffle client push data.
celeborn..io.mode

false
Netty EventLoopGroup backend, available options: NIO, EPOLL, KQUEUE. For Linux environments, EPOLL is used if available before using NIO. For MacOS/BSD environments, KQUEUE is used if available before using NIO.
celeborn..io.numConnectionsPerPeer
false
Number of concurrent connections between two nodes. If setting
to
rpc_app
, works for shuffle client. If setting
to
rpc_service
, works for master or worker. If setting
to
data
, it works for shuffle client push and fetch data. If setting
to
replicate
, it works for replicate client of worker replicating data to peer worker.
celeborn..io.preferDirectBufs
true
false
If true, we will prefer allocating off-heap byte buffers within Netty. If setting
to
rpc_app
, works for shuffle client. If setting
to
rpc_service
, works for master or worker. If setting
to
data
, it works for shuffle client push and fetch data. If setting
to
push
, it works for worker receiving push data. If setting
to
replicate
, it works for replicate server or client of worker replicating data to peer worker. If setting
to
fetch
, it works for worker fetch server.
celeborn..io.receiveBuffer
0b
false
Receive buffer size (SO_RCVBUF). Note: the optimal size for receive buffer and send buffer should be latency * network_bandwidth. Assuming latency = 1ms, network_bandwidth = 10Gbps buffer size should be ~ 1.25MB. If setting
to
rpc_app
, works for shuffle client. If setting
to
rpc_service
, works for master or worker. If setting
to
data
, it works for shuffle client push and fetch data. If setting
to
push
, it works for worker receiving push data. If setting
to
replicate
, it works for replicate server or client of worker replicating data to peer worker. If setting
to
fetch
, it works for worker fetch server.
0.2.0
celeborn..io.retryWait
5s
false
Time that we will wait in order to perform a retry after an IOException. Only relevant if maxIORetries > 0. If setting
to
data
, it works for shuffle client push and fetch data. If setting
to
replicate
, it works for replicate client of worker replicating data to peer worker. If setting
to
push
, it works for Flink shuffle client push data.
0.2.0
celeborn..io.saslTimeout
30s
false
Timeout for a single round trip of auth message exchange, in milliseconds.
0.5.0
celeborn..io.sendBuffer
0b
false
Send buffer size (SO_SNDBUF). If setting
to
rpc_app
, works for shuffle client. If setting
to
rpc_service
, works for master or worker. If setting
to
data
, it works for shuffle client push and fetch data. If setting
to
push
, it works for worker receiving push data. If setting
to
replicate
, it works for replicate server or client of worker replicating data to peer worker. If setting
to
fetch
, it works for worker fetch server.
0.2.0
celeborn..io.serverThreads
false
Number of threads used in the server thread pool. Default to 0, which is 2x#cores. If setting
to
rpc_app
, works for shuffle client. If setting
to
rpc_service
, works for master or worker. If setting
to
push
, it works for worker receiving push data. If setting
to
replicate
, it works for replicate server of worker replicating data to peer worker. If setting
to
fetch
, it works for worker fetch server.
celeborn..io.threads
false
Default number of threads used in the server and client thread pool. This specifies thread configuration based on JVM's allocation of cores. If setting
to
data
, it works for shuffle client push and fetch data.
celeborn..push.timeoutCheck.interval
5s
false
Interval for checking push data timeout. If setting
to
data
, it works for shuffle client push data. If setting
to
push
, it works for Flink shuffle client push data. If setting
to
replicate
, it works for replicate client of worker replicating data to peer worker.
0.3.0
celeborn..push.timeoutCheck.threads
false
Threads num for checking push data timeout. If setting
to
data
, it works for shuffle client push data. If setting
to
push
, it works for Flink shuffle client push data. If setting
to
replicate
, it works for replicate client of worker replicating data to peer worker.
0.3.0
celeborn..rpc.dispatcher.threads

false
Threads number of message dispatcher event loop for roles
celeborn.io.maxDefaultNettyThreads
64
false
Max default netty threads
0.3.2
celeborn.network.advertise.preferIpAddress

false
When
true
, prefer to use IP address, otherwise FQDN for advertise address.
0.6.0
celeborn.network.bind.preferIpAddress
true
false
When
true
, prefer to use IP address, otherwise FQDN. This configuration only takes effects when the bind hostname is not set explicitly, in such case, Celeborn will find the first non-loopback address to bind.
0.3.0
celeborn.network.bind.wildcardAddress
false
false
When
true
, the bind address will be set to a wildcard address, while the advertise address will remain as whatever is set by
celeborn.network.advertise.preferIpAddress
. The wildcard address is a special local IP address, and usually refers to 'any' and can only be used for bind operations. In the case of IPv4, this is 0.0.0.0 and in the case of IPv6 this is ::0. This is helpful in dual-stack environments, where the service must listen to both IPv4 and IPv6 clients.
0.6.0
celeborn.network.connect.timeout
10s
false
Default socket connect timeout.
0.2.0
celeborn.network.memory.allocator.numArenas

false
Number of arenas for pooled memory allocator. Default value is Runtime.getRuntime.availableProcessors, min value is 2.
0.3.0
celeborn.network.memory.allocator.verbose.metric
false
false
Whether to enable verbose metric for pooled allocator.
0.3.0
celeborn.network.timeout
240s
false
Default timeout for network operations.
0.2.0
celeborn.port.maxRetries
16
false
When port is occupied, we will retry for max retry times.
0.2.0
celeborn.rpc.RpcEndpointVerifier.separate.enabled
true
false
Whether to enable dispatcher process RpcEndpointVerifier's request separately.
0.7.0
celeborn.rpc.askTimeout
60s
false
Timeout for RPC ask operations. It's recommended to set at least
240s
when
HDFS
is enabled in
celeborn.storage.availableTypes
0.2.0
celeborn.rpc.connect.threads
64
false
0.2.0
celeborn.rpc.dispatcher.threads
false
Threads number of message dispatcher event loop. Default to 0, which is availableCore.
0.3.0
celeborn.rpc.dispatcher.numThreads
celeborn.rpc.dump.interval
60s
false
min interval (ms) for RPC framework to dump performance summary
0.6.0
celeborn.rpc.inbox.capacity
false
Specifies size of the in memory bounded capacity.
0.5.0
celeborn.rpc.io.threads

false
Netty IO thread number of NettyRpcEnv to handle RPC request. The default threads number is the number of runtime available processors.
0.2.0
celeborn.rpc.lookupTimeout
30s
false
Timeout for RPC lookup operations.
0.2.0
celeborn.rpc.retryWait
1s
false
Time to wait before next retry on RpcTimeoutException.
0.5.4
celeborn.rpc.slow.interval

false
min interval (ms) for RPC framework to log slow RPC
0.6.0
celeborn.rpc.slow.threshold
1s
false
threshold for RPC framework to log slow RPC
0.6.0
celeborn.shuffle.io.maxChunksBeingTransferred

false
The max number of chunks allowed to be transferred at the same time on shuffle service. Note that new incoming connections will be closed when the max number is hit. The client will retry according to the shuffle retry configs (see
celeborn..io.maxRetries
and
celeborn..io.retryWait
), if those limits are reached the task will fail with fetch failure.
0.2.0
celeborn.ssl..enabled
false
false
Enables SSL for securing wire traffic.
0.5.0
celeborn.ssl..enabledAlgorithms

false
A comma-separated list of ciphers. The specified ciphers must be supported by JVM.
The reference list of protocols can be found in the "JSSE Cipher Suite Names" section of the Java security guide. The list for Java 11, for example, can be found at
this page
Note: If not set, the default cipher suite for the JRE will be used
0.5.0
celeborn.ssl..keyStore

false
Path to the key store file.
The path can be absolute or relative to the directory in which the process is started.
0.5.0
celeborn.ssl..keyStorePassword

false
Password to the key store.
0.5.0
celeborn.ssl..protocol
TLSv1.2
false
TLS protocol to use.
The protocol must be supported by JVM.
The reference list of protocols can be found in the "Additional JSSE Standard Names" section of the Java security guide. For Java 11, for example, the list can be found
here
0.5.0
celeborn.ssl..sslHandshakeTimeoutMs
10s
false
The timeout for the SSL handshake (in milliseconds). The default value is set to the current Netty default. This is applicable for
rpc_app
and
rpc_service
modules
0.5.4
celeborn.ssl..trustStore

false
Path to the trust store file.
The path can be absolute or relative to the directory in which the process is started.
0.5.0
celeborn.ssl..trustStorePassword

false
Password for the trust store.
0.5.0
celeborn.ssl..trustStoreReloadIntervalMs
10s
false
The interval at which the trust store should be reloaded (in milliseconds), when enabled. This setting is mostly only useful for server components, not applications.
0.5.0
celeborn.ssl..trustStoreReloadingEnabled
false
false
Whether the trust store should be reloaded periodically.
This setting is mostly only useful for Celeborn services (masters, workers), and not applications.
0.5.0
Columnar Shuffle
Key
Default
isDynamic
Description
Since
Deprecated
celeborn.columnarShuffle.batch.size
10000
false
Vector batch size for columnar shuffle.
0.3.0
celeborn.columnar.shuffle.batch.size
celeborn.columnarShuffle.codegen.enabled
false
false
Whether to use codegen for columnar-based shuffle.
0.3.0
celeborn.columnar.shuffle.codegen.enabled
celeborn.columnarShuffle.enabled
false
false
Whether to enable columnar-based shuffle.
0.2.0
celeborn.columnar.shuffle.enabled
celeborn.columnarShuffle.encoding.dictionary.enabled
false
false
Whether to use dictionary encoding for columnar-based shuffle data.
0.3.0
celeborn.columnar.shuffle.encoding.dictionary.enabled
celeborn.columnarShuffle.encoding.dictionary.maxFactor
0.3
false
Max factor for dictionary size. The max dictionary size is
min(32.0 KiB, celeborn.columnarShuffle.batch.size * celeborn.columnar.shuffle.encoding.dictionary.maxFactor)
0.3.0
celeborn.columnar.shuffle.encoding.dictionary.maxFactor
celeborn.columnarShuffle.offHeap.enabled
false
false
Whether to use off heap columnar vector.
0.3.0
celeborn.columnar.offHeap.enabled
Metrics
Below metrics configuration both work for master and worker.
Key
Default
isDynamic
Description
Since
Deprecated
celeborn.metrics.capacity
4096
false
The maximum number of metrics which a source can use to generate output strings.
0.2.0
celeborn.metrics.collectPerfCritical.enabled
false
false
It controls whether to collect metrics which may affect performance. When enable, Celeborn collects them.
0.2.0
celeborn.metrics.conf

false
Custom metrics configuration file path. Default use
metrics.properties
in classpath.
0.3.0
celeborn.metrics.enabled
true
false
When true, enable metrics system.
0.2.0
celeborn.metrics.extraLabels
false
If default metric labels are not enough, extra metric labels can be customized. Labels' pattern is:
=[,=]*
; e.g.
env=prod,version=1
0.3.0
celeborn.metrics.json.path
/metrics/json
false
URI context path of json metrics HTTP server.
0.4.0
celeborn.metrics.json.pretty.enabled
true
false
When true, view metrics in json pretty format
0.4.0
celeborn.metrics.loggerSink.output.enabled
false
false
Whether to output scraped metrics to the logger. This config will have effect if you enabled logger sink.If you will not scrape metrics periodically, do add
org.apache.celeborn.common.metrics.sink.LoggerSink
to metrics.properties.
0.6.0
celeborn.metrics.loggerSink.scrape.interval
30min
false
The interval of logger sink to scrape its own metrics. This config will have effect if you enabled logger sink. If you will not scrape metrics periodically, do add
org.apache.celeborn.common.metrics.sink.LoggerSink
to metrics.properties.
0.6.0
celeborn.metrics.prometheus.path
/metrics/prometheus
false
URI context path of prometheus metrics HTTP server.
0.4.0
celeborn.metrics.sample.rate
1.0
false
It controls if Celeborn collect timer metrics for some operations. Its value should be in [0.0, 1.0].
0.2.0
celeborn.metrics.timer.slidingWindow.size
4096
false
The sliding window size of timer metric.
0.2.0
celeborn.metrics.worker.app.topResourceConsumption.bytesWrittenThreshold
0b
false
Threshold of bytes written for top resource consumption applications list of worker. The application which has bytes written less than this threshold will not be included in the top resource consumption list, including diskBytesWritten and hdfsBytesWritten.
0.6.0
celeborn.metrics.worker.app.topResourceConsumption.count
false
Size for top items about top resource consumption applications list of worker. The top resource consumption is determined by sum of diskBytesWritten and hdfsBytesWritten. The top resource consumption count prevents the total number of metrics from exceeding the metrics capacity. Note: This will add applicationId as label which is considered as a high cardinality label, be careful enabling it on metrics systems that are not optimized for high cardinality columns.
0.6.0
celeborn.metrics.worker.appLevel.enabled
true
false
When true, enable worker application level metrics. Note: applicationId is considered as a high cardinality label, be careful enabling it on metrics systems that are not optimized for high cardinality columns.
0.6.0
celeborn.metrics.worker.pauseSpentTime.forceAppend.threshold
10
false
Force append worker pause spent time even if worker still in pause serving state. Help user can find worker pause spent time increase, when worker always been pause state.
metrics.properties
*.sink.csv.class
org.apache.celeborn.common.metrics.sink.CsvSink
*.sink.prometheusServlet.class
org.apache.celeborn.common.metrics.sink.PrometheusServlet
Environment Variables
Recommend configuring in
conf/celeborn-env.sh
Key
Default
Description
CELEBORN_HOME
$(cd "`dirname "$0"`"/..; pwd)
CELEBORN_CONF_DIR
${CELEBORN_CONF_DIR:-"${CELEBORN_HOME}/conf"}
CELEBORN_MASTER_MEMORY
1 GB
CELEBORN_WORKER_MEMORY
1 GB
CELEBORN_WORKER_OFFHEAP_MEMORY
1 GB
CELEBORN_MASTER_JAVA_OPTS
CELEBORN_WORKER_JAVA_OPTS
CELEBORN_PID_DIR
${CELEBORN_HOME}/pids
CELEBORN_LOG_DIR
${CELEBORN_HOME}/logs
CELEBORN_SSH_OPTS
-o StrictHostKeyChecking=no
ssh opts for
start-all
and
stop-all
operations
CELEBORN_SLEEP
Waiting time for
start-all
and
stop-all
operations
CELEBORN_PREFER_JEMALLOC
set
true
to enable jemalloc memory allocator
CELEBORN_JEMALLOC_PATH
jemalloc library path
CELEBORN_NO_DAEMONIZE
set
true
to run the proposed command in the foreground
Tuning
Assume we have a cluster described as below:
5 Celeborn Workers with 20 GB off-heap memory and 10 disks.
As we need to reserve 20% off-heap memory for netty,
so we could assume 16 GB off-heap memory can be used for flush buffers.
If
spark.celeborn.client.push.buffer.max.size
is 64 KB and
celeborn.worker.flusher.buffer.size
is 256 KB, we can have total slots up to 327,680 slots and in-flight requests up to 1,310,720.
If you have 8192 mapper tasks, you could set
spark.celeborn.client.push.maxReqsInFlight=160
to gain performance improvements.
In-Flight Request Calculation (1,310,720):
Given:
16 GB
usable off-heap per worker,
celeborn.worker.flusher.buffer.size = 256 KB
spark.celeborn.client.push.buffer.max.size = 64 KB
and
5 Celeborn workers
Steps:
slots_per_worker = 16 GB / 256 KB -> 65,536
total_slots = 65,536 * 5 -> 327,680
requests_per_slot = 256 KB / 64 KB -> 4
total_inflight_requests = 327,680 × 4 -> 1,310,720
Rack Awareness
Celeborn can be rack-aware by setting
celeborn.client.reserveSlots.rackware.enabled
to
true
on client side.
Shuffle partition block replica placement will use rack awareness for fault tolerance by placing one shuffle partition replica
on a different rack. This provides data availability in the event of a network switch failure or partition within the cluster.
Celeborn master daemons obtain the rack id of the cluster workers by invoking either an external script or Java class as specified by configuration files.
Using either the Java class or external script for topology, output must adhere to the java
org.apache.hadoop.net.DNSToSwitchMapping
interface.
The interface expects a one-to-one correspondence to be maintained and the topology information in the format of
/myrack/myhost
where
is the topology delimiter,
myrack
is the rack identifier, and
myhost
is the individual host.
Assuming a single
/24
subnet per rack, one could use the format of
/192.168.100.0/192.168.100.5
as a unique rack-host topology mapping.
To use the Java class for topology mapping, the class name is specified by the
celeborn.hadoop.net.topology.node.switch.mapping.impl
parameter in the master configuration file.
An example,
NetworkTopology.java
, is included with the Celeborn distribution and can be customized by the Celeborn administrator.
Using a Java class instead of an external script has a performance benefit in that Celeborn doesn't need to fork an external process when a new worker node registers itself.
If implementing an external script, it will be specified with the
celeborn.hadoop.net.topology.script.file.name
parameter in the master side configuration files.
Unlike the Java class, the external topology script is not included with the Celeborn distribution and is provided by the administrator.
Celeborn will send multiple IP addresses to ARGV when forking the topology script. The number of IP addresses sent to the topology script
is controlled with
celeborn.hadoop.net.topology.script.number.args
and defaults to 100.
If
celeborn.hadoop.net.topology.script.number.args
was changed to 1, a topology script would get forked for each IP submitted by workers.
If
celeborn.hadoop.net.topology.script.file.name
or
celeborn.hadoop.net.topology.node.switch.mapping.impl
is not set, the rack id
/default-rack
is returned for any passed IP address.
While this behavior appears desirable, it can cause issues with shuffle partition block replication as default behavior
is to write one replicated block off rack and is unable to do so as there is only a single rack named
/default-rack
Example can refer to
Hadoop Rack Awareness
since Celeborn use hadoop's code about rack-aware.
Worker Recover Status After Restart
ShuffleClient
records the shuffle partition location's host, service port, and filename,
to support workers recovering reading existing shuffle data after worker restart,
during worker shutdown, workers should store the meta about reading shuffle partition files in RocksDB or LevelDB(deprecated),
and restore the meta after restarting workers, also workers should keep a stable service port to support
ShuffleClient
retry reading data. Users should set
celeborn.worker.graceful.shutdown.enabled
to
true
and
set below service port with stable port to support worker recover status.
celeborn.worker.rpc.port
celeborn.worker.fetch.port
celeborn.worker.push.port
celeborn.worker.replicate.port