AvroConfluent
| Input | Output | Alias | 
|---|---|---|
| ✔ | ✗ | 
Description
AvroConfluent supports decoding single-object Avro messages commonly used with Kafka and Confluent Schema Registry. Each Avro message embeds a schema ID that can be resolved to the actual schema with the help of the Schema Registry. Schemas are cached once resolved.
Data Types Matching
The table below shows all data types supported by the Apache Avro format, and their corresponding ClickHouse data types in INSERT and SELECT queries.
| Avro data type INSERT | ClickHouse data type | Avro data type SELECT | 
|---|---|---|
| boolean,int,long,float,double | Int(8\16\32), UInt(8\16\32) | int | 
| boolean,int,long,float,double | Int64, UInt64 | long | 
| boolean,int,long,float,double | Float32 | float | 
| boolean,int,long,float,double | Float64 | double | 
| bytes,string,fixed,enum | String | bytesorstring* | 
| bytes,string,fixed | FixedString(N) | fixed(N) | 
| enum | Enum(8\16) | enum | 
| array(T) | Array(T) | array(T) | 
| map(V, K) | Map(V, K) | map(string, K) | 
| union(null, T),union(T, null) | Nullable(T) | union(null, T) | 
| union(T1, T2, …)** | Variant(T1, T2, …) | union(T1, T2, …)** | 
| null | Nullable(Nothing) | null | 
| int (date)*** | Date, Date32 | int (date)*** | 
| long (timestamp-millis)*** | DateTime64(3) | long (timestamp-millis)*** | 
| long (timestamp-micros)*** | DateTime64(6) | long (timestamp-micros)*** | 
| bytes (decimal)*** | DateTime64(N) | bytes (decimal)*** | 
| int | IPv4 | int | 
| fixed(16) | IPv6 | fixed(16) | 
| bytes (decimal)*** | Decimal(P, S) | bytes (decimal)*** | 
| string (uuid)*** | UUID | string (uuid)*** | 
| fixed(16) | Int128/UInt128 | fixed(16) | 
| fixed(32) | Int256/UInt256 | fixed(32) | 
| record | Tuple | record | 
* bytes is default, controlled by setting output_format_avro_string_column_pattern
**  The Variant type implicitly accepts null as a field value, so for example the Avro union(T1, T2, null) will be converted to Variant(T1, T2).
As a result, when producing Avro from ClickHouse, we have to always include the null type to the Avro union type set as we don't know if any value is actually null during the schema inference.
Unsupported Avro logical data types:
- time-millis
- time-micros
- duration
Example Usage
To quickly verify schema resolution, you can use kafkacat with clickhouse-local:
$ kafkacat -b kafka-broker  -C -t topic1 -o beginning -f '%s' -c 3 | clickhouse-local   --input-format AvroConfluent --format_avro_schema_registry_url 'http://schema-registry' -S "field1 Int64, field2 String"  -q 'select *  from table'
1 a
2 b
3 c
To use AvroConfluent with Kafka:
CREATE TABLE topic1_stream
(
    field1 String,
    field2 String
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka-broker',
kafka_topic_list = 'topic1',
kafka_group_name = 'group1',
kafka_format = 'AvroConfluent';
-- for debug purposes you can set format_avro_schema_registry_url in a session.
-- this way cannot be used in production
SET format_avro_schema_registry_url = 'http://schema-registry';
SELECT * FROM topic1_stream;
Format Settings
The Schema Registry URL is configured with format_avro_schema_registry_url.
Setting format_avro_schema_registry_url needs to be configured in users.xml to maintain it’s value after a restart. Also you can use the format_avro_schema_registry_url setting of the Kafka table engine.
| Setting | Description | Default | 
|---|---|---|
| input_format_avro_allow_missing_fields | For Avro/AvroConfluent format: when field is not found in schema use default value instead of error | 0 | 
| input_format_avro_null_as_default | For Avro/AvroConfluent format: insert default in case of null and non Nullable column | 0 | 
| format_avro_schema_registry_url | For AvroConfluent format: Confluent Schema Registry URL. |