schema_registry_converter

This library provides a way of using

schema_registry_converter

This library provides a way of using the Confluent Schema Registry in a way that is compliant with the Java client. Since Karapace is API compatible it could also be used with this library. The release notes can be found on github Consuming/decoding and producing/encoding is supported. It's also possible to provide the schema to use when decoding. You can also include references when decoding. Without a schema provided, the latest schema with the same subject will be used.

It's supposed to be feature complete compared to the Java version. If anything is missing or not working as expected please create an issue or start a discussion on github discussions. An example of using this library async with protobuf to produce data to Kafka can be found in ksqlDB-GraphQL-poc. A blog with a bit of background on this library can be found titled confluent Schema Registry and Rust

Getting Started

schema_registry_converter.rs is available on crates.io. It is recommended to look there for the newest and more elaborate documentation. It has a couple of feature flags, be sure to set them correctly.

To use it to convert using avro async use:

For simplicity there are easy variants that internally have an arc. Making it easier to use at the price of some overhead. To use the easy variants add the easy feature and use the structs that start with Easy in the name to do the conversions.

...and see the docs for how to use it.

All the converters also have a blocking (non async) version, in that case use something like:

If you need to use both in a project you can use something like, but have to be weary you import the correct paths depending on your use.

Consumer

For consuming messages encoded with the schema registry, you need to fetch the correct schema from the schema registry to transform it into a record. For clarity, error handling is omitted from the diagram.

Producer

For producing messages which can be properly consumed by other clients, the proper id needs to be encoded with the message. To get the correct id, it might be necessary to register a new schema. For clarity, error handling is omitted from the diagram.

Example with consumer and producer using Avro (blocking)

Examples which does both consuming/decoding and producing/encoding. To use structs with Avro they must have an implementation of either the serde::Deserialize or serde::Serialize trait to work. The examples are especially useful to update from the 1.x.x version, when starting you probably want to use the async versions.

Example using to post schema to schema registry

Relation to related libraries

The avro part of the conversion is handled by avro-rs. As such, I don't include tests for every possible schema. While I used rdkafka in combination to successfully consume from and produce to kafka, and while it's used in the example, this crate has no direct dependency on it. All this crate does is convert [u8] <-> Some Value (based on converter used). With Json and Protobuf some other dependencies are pulled in, by using said features. I have tried to encapsulate all the errors in the SRCError type. So even when you get a pannic/error that's an SRCError it could be an error from one of the dependencies. Please make sure you are using the library correctly, and the error is not caused by a depency, before creating an issue.

Tests

Due to mockito, used for mocking the schema registry responses, being run in a separate thread, tests have to be run using --test-threads=1 for example like cargo +stable test --color=always --features avro,json,proto_decoder,proto_raw -- --nocapture --test-threads=1

Integration test

The integration tests require a Kafka cluster running on the default ports. It will create topics, register schema's, produce and consume some messages. They are only included when compiled with the kafka_test feature, so to include them in testing cargo +stable test --all-features --color=always -- --nocapture --test-threads=1 needs to be run. The 'prepare_integration_test.sh' script can be used to create the 3 topics needed for the tests. To ensure Java compatibility it's also needed to run the schema-registry-test-app docker image.

License

This project is licensed under either of

  • Apache License, Version 2.0, (LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0)
  • MIT license (LICENSE-MIT or #404)

at your option.

Contribution

Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in Schema Registry Converter by you, as defined in the Apache-2.0 license, shall be dual licensed as above, without any additional terms or conditions.

Issues

Collection of the latest Issues

arkanmgerges

arkanmgerges

2

Is your feature request related to a problem? Please describe. I have a use case,

  1. Creating schema with references
  2. Validating the schema

I have a user schema with "id", and "role_name", and user_command schema which is used as a part of a micro service, but also it uses the user schema as a reference

1. User and User Command schemas:

2. Registering the schemas into the schema registry:

Until here there there is not problem, and the schema will be registered into the schema registry.

The problem arises when I need to validate + produce it to Kafka, how can I use my user_command_schema to validate my data before is produced into Kafka

The code will fail if I try to encode the data using the schema and it will fail by running this code:

Describe the solution you'd like I need in my example using json that producer.send_json to do the validation before sending the data to kafka, also in case transactional producer/consumer to validate the data before sending (for producer) and after receiving (for consumer).

tzachshabtay

tzachshabtay

8

Hi, so based on this: https://github.com/flavray/avro-rs/pull/99#issuecomment-1017195592, avro-rs is not really maintained anymore and the code was contributed to apache-avro, which has added support for named/recursive types (features from the avro spec that current avro-rs does not support).

If this project could switch to the maintained and more fully featured avro library that would be awesome. Thanks.

Versions

Find the latest versions by id

v2.1.0 - Jul 11, 2021

This release will focus on making the library easier to use. At least the following should be done:

  • Update the readme and maybe add some more use cases.
  • Have a reliable and faster ci, likely by moving to Github actions.
  • Implement the standard Error trait for SRCError.
  • For protobuf, have an easier way to encode with single message schema's, not requiring providing the full_name.
  • For async, have either an example or in the library a nice way to share the converter in multiple threads, so the users don't have to think about this. See https://github.com/gklijs/ksqlDB-GraphQL-poc/blob/main/rust-data-creator/src/data_producer/kafka_producer.rs but improve on that. I would be nice to not have to depend on rdkafka through.
  • Enable supplying a reqwest client for any additional setting/modifications needed for the schema registry calls.

v2.0.1 - Nov 10, 2020

Maintenance release with mainly updated dependencies, making the blocking sr settings cloneable and no longer needs kafka_test feature to use both blocking and async in the same project.

v2.0.0 - Aug 23, 2020

  • Add json schema support.
  • Add protobuf support.
  • Support references in schema registry.
  • Add authentication proxies, timeouts, etc, by using reqwest instead of curl.
  • Support async/non-blocking by default
  • For Avro, make it possible to use the encode_struct function with primitive values.

v2.0.2 - Feb 20, 2021

Just updating the dependencies.

Information - Updated Jun 23, 2022

Stars: 59
Forks: 20
Issues: 4

async GraphQL extension with Apollo tracing

A extension for Rust that makes async GraphQL a snap to learn

async GraphQL extension with Apollo tracing

A small and fast async runtime

This crate simply re-exports other smaller async crates (see the source)

A small and fast async runtime

An async Redis client for Rust built on Tokio and Futures

With pipeline requests, with an option for callers to disable this

An async Redis client for Rust built on Tokio and Futures

rain-async-microservices

actix-web + async + graphql

rain-async-microservices

futures-async-stream

Compiler support: requires rustc nightly-2021-01-09+

futures-async-stream

#[derive(Future, Stream, Sink, AsyncRead, AsyncWrite, AsyncSeek, AsyncBufRead)] for enums

auto_enums crate for how to automate patterns like this

#[derive(Future, Stream, Sink, AsyncRead, AsyncWrite, AsyncSeek, AsyncBufRead)] for enums

Deadpool is a dead simple async pool for connections and objects

This crate provides two implementations:

Deadpool is a dead simple async pool for connections and objects

async-jsonrpc-client

Event-driven JSON-RPC client

async-jsonrpc-client

This crate provides an oauth 1

0a client implementation fully-async with

This crate provides an oauth 1

cxx-async is a Rust crate that extends the cxx library to provide seamless

cxx-async is a Rust crate that extends the C++20 coroutines

cxx-async is a Rust crate that extends the cxx library to provide seamless

This Rust crate provides the marker types

which make the containing type !Send and !Sync respectively

This Rust crate provides the marker types

cache-loader-async

To create a LoadingCache with lru cache backing use the with_backing method on the LoadingCache

cache-loader-async
Facebook Instagram Twitter GitHub Dribbble
Privacy