How to develop a custom connector with Airbyte

Airbyte is a user-friendly tool that enables companies to gather data from various sources and load it into a variety of locations for analytics and business intelligence. In this blog, we demonstrate how to use Trino as a source connector for Airbyte.

GraphQL has a role beyond API Query Language- being the backbone of application Integration
background Coditation

How to develop a custom connector with Airbyte

Airbyte enables companies to gather data from various sources and load it into a variety of locations for analytics and business intelligence. It offers a user-friendly and expandable tool that can link to different data sources, transform the data, and load it into various data storage services, including data warehouses, databases, and cloud storage.
In addition to connecting to numerous data sources like databases, APIs, file storage, and SaaS programs, Airbyte also supports a broad variety of data destinations like Snowflake, BigQuery, Redshift, and others. Additionally, it provides pre-built connectors, making it simple to link to various data sources and destinations without putting much effort into development.
Due to its distributed and parallelized architecture, Airbyte is built to manage large amounts of data. Additionally, it is simple to use and has a user-friendly web interface and flexible data integration framework that can be tailored to meet various use cases.

Prerequisites and the need for custom connectors in Airbyte:

We frequently want to gather data from various sources to a centralized location so that analysis can be performed on that data and later used to back certain decisions that the company makes or wants to take.

Airbyte custom connectors
Let’s understand the need for Airbyte and custom connectors through a story!

There was a period when HRmon was a startup. HRmon set out to change the way companies handle human resources by offering a cloud-based platform for managing employee records, payroll, and benefits.
To accomplish this, HRmon is required to integrate with a variety of data sources, including various HR systems, financial software, and APIs. They were having difficulty keeping up with the complexity of creating and maintaining custom integrations for all of these various sources.
That's when they came across Airbyte, a modern and open-source data integration platform with pre-built connectors for a wide range of common data sources, including many of the systems they needed to integrate with.
With Airbyte, HRmon was able to easily integrate their platform with all of the data sources they needed, without having to build custom integrations from scratch. They were able to connect to their customers' data sources quickly, saving them time and resources.
However, there were still some data sources that were not supported by Airbyte's pre-built connectors. That's when they decided to build a custom connector using Airbyte's Custom Connector feature. The development team was able to build the custom connector quickly and easily, thanks to Airbyte's SDK.
With the custom connector in place, HRmon was able to connect to their customers' unique data sources, providing a more personalized and flexible solution. Their customers loved the flexibility and ease of use of the platform, and HRmon's customer base continued to grow.
In the end, HRmon was able to deliver a powerful and flexible platform to manage HR, payroll, and benefits, thanks to Airbyte's pre-built connectors and Custom Connector feature. They were able to save time and resources while providing their customers with a flexible and personalized solution that met their unique needs.
As you see, users can build their custom connectors to link to data sources or destinations that the pre-built connectors do not support. These connectors are especially helpful for companies that need specialized integration methods and have private or uncommon data sources. Utilizing the Airbyte Connector Development Kit, developers can create their custom connections, which Airbyte supports. (CDK). Programming skills are typically needed to handle the authentication, data extraction, and data loading procedures particular to the data source or destination when creating custom connectors. But Airbyte's connector development kit provides a streamlined structure that facilitates developers' development processes.
Once users build custom connectors, they can integrate them with Airbyte's data integration pipeline, alongside the pre-built connectors.

We are going to develop Trino as a Source Connector for Airbyte, but what is Trino?

Trino is a distributed SQL query engine that allows users to perform complex and interactive queries across multiple data sources. It was originally developed by Facebook and is now an open-source project that can handle petabytes of data at a fast and scalable rate.

The Trino query engine comprises several parts that work together to deliver high-performance SQL query processing. The Coordinator component is the central part that manages query execution, planning, and result coordination. The Worker nodes execute the tasks assigned by the Coordinator, while the Connectors act as Trino's interface to various data sources, including relational databases, NoSQL databases, and file systems.

Trino's distributed architecture enables horizontal scaling, which makes it ideal for processing massive amounts of data. Moreover, its ability to integrate with a broad range of data sources makes it a versatile tool for data querying and analysis.

How do I create a Trino Source Connector for Airbyte:

1. Generate a Source Connector Template using Airbyte’s CDK (Connector Development Kit)
   This can be achieved through the following commands:


cd airbyte-integrations/connector-templates/generator./generate.sh

2. From the prompt choose:


[PLOP] Please choose a generator. 

  1. Python HTTP API Source - Generate a Source that pulls data from a synchronous HTTP API. 
  2. Configuration Based Source - Generate a Source that is described using a low code configuration file 
  3. Python Singer Source - Generate a Singer-tap-based Airbyte Source. 
❯ 4. Python Source - Generate a minimal Python Airbyte Source Connector that works with any kind of data source. Use this if none of the other Python templates serve your use case. 
  5. Java JDBC Source - Generate a minimal Java JDBC Airbyte Source Connector. 
  6. Generic Source - Use if none of the other templates apply to your use case.
  
  

3. Give the source a name. For example: trino. This will create a new folder named source-trino, with the airbyte source connector code template under airbyte-integrations/connectors folder.

4. The Template contains a few important files that we require to modify for our connector to work, namely: 

  • spec.yaml
  • source.py file
  • source_definitions.yaml
  • source_specs.yaml

5. The spec.yaml file contains the specifications about the source connector, which includes the mandatory fields for the connector to work, and these fields are rendered in the UI as well. In this case:

a. Hostname
b. Username
c. Catalog
d. Schema
The above are required fields to work with Trino as a connector.

6. The source_specs.yaml file contains information regarding the docker image name and other information similar to the spec.yaml file. We need to configure the docker-image name as -


dockerImage: "airbyte/source-trino:0.1.0"

7. Open airbyte-config/init/src/main/resources/seed/source_definitions.yaml. You'll find a list of all the connectors that Airbyte displays in the UI. Pattern match to add your own connector. Make sure to generate a new unique UUIDv4 for the sourceDefinitionId field. You can get one here : UUID_Generator

Note: Modifications made to the source_definitions.yaml will only be picked-up the first time you start Airbyte, or when you upgrade Airbyte, or if you entirely wipe your instance of Airbyte and start from scratch.

8. The source.py contains the main code where the magic happens, this is where we write different functions: read(), discover(), and check() to use the connector properly with Airbyte.

9. Once we understand the structure of the Template, we need to set up the Python Virtual Environment to develop and test our connector code. To do this:


python -m venv .venv

And activate our Virtual Environment with the:


source .venv/bin/activate

10. We are required to install the requirements for the connector to work, which includes the airbyte-cdk modules, and trino python client.

By default, the template has a requirements.txt file, and to add extra requirements, we are required to edit the setup.py file. Once our requirements are planned out and ready to be installed we install it through pip, using the:


pip install -r requirements.txt

11. When the requirements are installed, we perform Step 5 and update the spec file.

12. We set a sample configuration based on the spec.yaml, inside secrets/config.json, For example:


{
  "hostname": "localhost",
  "port": "8080",
  "username": "test",
  "catalog": "postgres",
  "schema":  "public"
}

13. Next, we edit the source.py file to meet our needs for the connector, here are the required functions: 

a. Check: The check step verifies whether the connector can establish a connection with the underlying data source using the user-provided configuration
b. Discover: The discover phase identifies the various data streams that the connector can produce. 
c. Read: the read step involves extracting data from the underlying data source

For example, our check function will contain the following:


def check(self, logger: AirbyteLogger, config: json) -> AirbyteConnectionStatus:
       """ Tests if the input configuration can be used to successfully connect to the integration """
       try:
           conn = connect(
               host=config["hostname"], #as per spec.yaml
               port=config["port"], #as per spec.yaml
               user=config["username"], #as per spec.yaml
               catalog=config["catalog"], #as per spec.yaml
               schema=config["schema"] #as per spec.yaml
           )
           cur = conn.cursor()
           conn.close()
           return AirbyteConnectionStatus(status=Status.SUCCEEDED)
       except Exception as e:
           return AirbyteConnectionStatus(status=Status.FAILED, message=f"An exception occurred => {str(e.__class__)}: {str(e)}")
           
           

Similarly, the discover and read functions are to be written to perform their desired action

14. In discover(), we add streams = [ ]  which will be a list of AirbyteStream objects. Let's walk through what each field in a stream means.

  • name - The name of the stream.
  • supported_sync_modes - This field lists the type of data replication that this source supports. The possible values in this array include FULL_REFRESH (docs) and INCREMENTAL (docs).
  • source_defined_cursor - If the stream supports INCREMENTAL replication, then this field signals whether the source can figure out how to detect new records on its own or not.
  • json_schema - This field is a JsonSchema object that describes the structure of the data. Notice that each key in the properties object corresponds to a column name in our database table

15. Read() function takes a parameter ConfiguredAirbyteCatalog. Just as with the AirbyteCatalog the ConfiguredAirbyteCatalog contains a list. This time it is a list of ConfiguredAirbyteStream (instead of just AirbyteStream). It is basically filtering out the schemas for users according to the data required. We add it in a new folder sample_files and create a configured_catalog.json for testing.

Example of how configuredAirbyteCatalog looks like:


{
  "streams": [
    {
      "stream": {
        "name": "customer",
        "json_schema": {
          "$schema": "http://json-schema.org/draft-07/schema#",
          "type": "object",
          "properties": {
            "custkey": {
              "type": "integer"
            },
            "name": {
              "type": "string"
            },
            "address": {
              "type": "string"
            },
            "acctbal": {
              "type": "number"
            }
          }
        },
        "supported_sync_modes": [
          "full_refresh",
          "incremental"
        ]
      },
"sync_mode": "full_refresh",
      "destination_sync_mode": "overwrite"
    },
    {
      "stream": {
        "name": "nation",
        "json_schema": {
          "$schema": "http://json-schema.org/draft-07/schema#",
          "type": "object",
          "properties": {
            "nationkey": {
              "type": "integer"
            },
            "name": {
              "type": "string"
              
              

To test our connection, we can invoke the check function using the following:


python main.py check --config secrets/config.json

On a successful connection, the output is:


{"type": "LOG", "log": {"level": "INFO", "message": "Check succeeded"}}
{"type": "CONNECTION_STATUS", "connectionStatus": {"status": "SUCCEEDED"}}

17. Similarly, the discover and read functions are tested to ensure that they work according to our requirements.

18. Once, our connector is working locally, we are ready to build the docker container for our connector, using:


docker build . -t airbyte/source-trino:0.1.0

Note : `source-trino` is the name of  airbyte_source and `0.1.0` is the dockerImageTag mentioned in the source_definitions.yaml file.

Once our image is built locally, we can build the entire application using the `gradle-build` command from your application directory.


VERSION=0.40.32 ./gradlew generate-docker

Note: Before gradle build, make sure to docker-compose down and remove all the images , build the trino image again using docker build command mentioned above, and then run this command. After successful build of gradle, you can run:


docker-compose up

Performing the above steps adds Trino as a Source Connector within Airbyte. Now, we can easily sync up Trino with other available Destinations like Postgres, MySQL etc.

Let’s look at some Benchmarks for the amount of data and time taken:

1. Trino as a Source and Postgres as Destination

Trino as a source

2. File (pre-built connector) and Postgres as Destination

pre-built connectors

3. Zendesk Support as Source Connector and Postgres as Destination

ZenDesk support as source connector

Different connectors can be set up similarly into Airbyte, hence making it flexible and a great place for integration connections!

Hi, I am Jenisha Yadav. I have honed my expertise in Python and Java for 2.5 years and am passionate about designing and implementing complex algorithms and delivering high-quality software products.

Want to receive update about our upcoming podcast?

Thanks for joining our newsletter.
Oops! Something went wrong.