Writing a simple Pulumi provider for Airbyte

pulumi

Writing a Pulumi provider for Airbyte

In this quick tutorial I will go over Pulumi by writing a very simple Provider for Airbyte.

The simplified use case is. I want to programatically (infrastructure as code) create, update and delete Sources, Destinations and Connections in Airbyte instead of using the UI.

Airbyte has its own Terraform Provider however, at the time of writing, it is somewhat not entirely mature and dificult to debug, particularly if you are not used to writing your own terraform providers

Airbyte offers a great and simple REST API which in fact is what is used under the hood by the Terraform provider.

Instead of using the Terraform provider I was planning to use the API directly, as it is easier to try, configure, and debug in case of getting errors. You simply use any REST or HTTP client (I was planning on using requests) and invoke the different API endpoints. For example for creating a BigQuery Destination you would do a HTTP POST similar to the following (assuming you are connecting to api.airbyte.com):

import requests

url = "https://api.airbyte.com/v1/destinations"

payload = {
    "configuration": {
        "destinationType": "bigquery",
        "dataset_location": "US",
        "loading_method": { "method": "Standard" },
        "project_id": "xxx",
        "dataset_id": "abc",
        "credentials_json": "jjjjjjj"
    },
    "name": "destination"
}
headers = {
    "accept": "application/json",
    "content-type": "application/json"
}

response = requests.post(url, json=payload, headers=headers)

print(response.text)

That is all and good, simple and easy to understand and debug. However, using Terraform hast its advantages. One advantage of using Terraform is the state management. Using the Terraform provider you declare your resources (Sources, Destinations and Connections) and the provider makes sure to create them in Airbyte. If you modify the Terraform configuration, the provider makes the correct modification (PATCH Http requests) in Airbyte. If you delete the resource in the Terraform file, again the provider notices this and requests the deletion of the component in Airbyte

So I wanted to see if there was a way to combine the best of both worlds. The developer friendly simplicity of the Python code, with something similar to the State management from Terraform.

Enter Pulumi which allowed me to do exactly that, and in a very elegant, flexible and secure way.

From its home page Pulumi says:

Infrastructure as Codein Any Programming Language … Author infrastructure code using programming languages you know and love. Write statements to define infrastructure using your IDE with autocomplete, type checking, and documentation.

Awesome that sounded promising. So I established my PoC

  • Write an Airbyte Pulumi “provider” that allows to:
    • Create, Modify, Delete Sources
    • Create, Modify, Delete Destinations
    • Create, Modify, Delete Connections
  • Write a few resources to exercise the provider
    • In particular I selected a Jira source connected to a BigQuery destination.
  • Create some external resources using Pulumi and link them to the Airbyte specific resources
    • In particular create a dataset in BigQuery using Pulumi GCP provider.
    • Link that dataset to an Airbyte BigQuery destination
  • Make sure I can store the State in the cloud (GCS in particular)
  • Make sure the State doesnt contain plain text secrets or passwords.
  • Make sure I can run it programatically easily, as well as from the command line.

Setting up the project

brew install pulumi/tap/pulumi
mkdir pulumi_airbyte
cd pulumi_airbyte
pulumi  login  --local
pulumi new python

follow the wizard

You should end up with a directory content like:

Pulumi.dev.yaml Pulumi.yaml __main__.py requirements.txt venv/

Writing the provider

In Airbyte we define Sources, Destinations and Connections. We will create a very simple Pulumi provider that allow us to create these:

import pulumi
from pulumi.dynamic import Resource, ResourceProvider, CreateResult, UpdateResult
import requests

AIRBYTE_API_URL = "http://localhost:8006/v1"  # Update with your Airbyte API URL


class SourceProvider(ResourceProvider):
    def create(self, inputs):
        headers = {
            "accept": "application/json",
            "content-type": "application/json"
        }
        response = requests.post(f"{AIRBYTE_API_URL}/sources", json=inputs, headers=headers)
        response_data = response.json()
        source_id = response_data["sourceId"]
        outputs = {**inputs, "id": source_id}
        return CreateResult(id_=source_id, outs=outputs)

    def update(self, id, olds, news):
        update_data = {**news, "sourceId": id}
        requests.patch(f"{AIRBYTE_API_URL}/sources/{id}", json=update_data)
        return UpdateResult(outs=news)

    def delete(self, id, props):
        requests.delete(f"{AIRBYTE_API_URL}/sources/{id}", json={"sourceId": id})


class DestinationProvider(ResourceProvider):
    def create(self, inputs):
        response = requests.post(f"{AIRBYTE_API_URL}/destinations", json=inputs)
        response_data = response.json()
        destination_id = response_data["destinationId"]
        outputs = {**inputs, "id": destination_id}
        return CreateResult(id_=destination_id, outs=outputs)

    def update(self, id, olds, news):
        update_data = {**news, "destinationId": id}
        requests.patch(f"{AIRBYTE_API_URL}/destinations/{id}", json=update_data)
        return UpdateResult(outs=news)

    def delete(self, id, props):
        requests.delete(f"{AIRBYTE_API_URL}/destinations/{id}", json={"destinationId": id})


class ConnectionProvider(ResourceProvider):
    def create(self, inputs):
        response = requests.post(f"{AIRBYTE_API_URL}/connections", json=inputs)
        response_data = response.json()
        connection_id = response_data["connectionId"]
        outputs = {**inputs, "id": connection_id}
        return CreateResult(id_=connection_id, outs=outputs)

    def update(self, id, olds, news):
        print("UPDATING WITH: ")
        response = requests.patch(f"{AIRBYTE_API_URL}/connections/{id}", json=news)
        print("RESPONSE: " + response.text + " " + str(response.status_code))
        return UpdateResult(outs=news)

    def delete(self, id, props):
        requests.delete(f"{AIRBYTE_API_URL}/connections/{id}", json={"connectionId": id})

We see we have defined 3 ResourceProvider types. In each of them we can see we defined the logic to create, update and delete resources.

Next we need to define the Resource types to create the that will use each of these providers. So we create a Source a Destination and a Connection resource. We will also create a more specific JiraResource to demonstrate that if we want we can create full Python objects to specify our resources and their properties

class Source(Resource):
    def __init__(self, the_name, opts=None, **props):
        super().__init__(SourceProvider(), the_name, props, opts)


class Destination(Resource):
    def __init__(self, the_name, opts=None, **props):
        super().__init__(DestinationProvider(), the_name, props, opts)


class Connection(Resource):
    def __init__(self, the_name, opts=None, **props):
        super().__init__(ConnectionProvider(), the_name, props, opts)


class JiraSource(Source):
    def __init__(self, the_name, *, workspace, name, api_token, domain, email, opts=None):
        configuration = {"workspaceId": workspace, "name": name,
                         "configuration": {"sourceType": "jira",
                                           "api_token": api_token,
                                           "domain": domain,
                                           "email": email}}
        super().__init__(the_name, opts, **configuration)

You can see we extend the Resource class from Pulumi.

We have now defined all the types we need. This is now a working provider. The next step is to define the actual resources.

We will define:

  • A BigQuery Dataset
  • A GCP Service account with permissions on the dataset
  • A Jira Airbyte Source
  • A BigQuery Airbyte Destination
  • A Jira -> BigQuery Connection.

Defining the GCP Resources

Pulumi offers a lot of providers already made. In particular we will use the out-of-the-box GCP Provider.

Let’s define the dataset and service account in a file named gcp_resources.py

import pulumi  
import pulumi_gcp as gcp  
from dotenv import load_dotenv  
  
load_dotenv()  
  
  
def gcp_resources():  
  config = pulumi.Config("pulumi_airbyte")  
  bqowner = gcp.serviceaccount.Account("bqowner2",   account_id="bqowner2", project=config.require_secret('gcpProject'))  
  
  dataset = gcp.bigquery.Dataset("dataset",  
  project=config.require_secret('gcpProject'),  
  dataset_id="airbyte_pulumi_2",  
  friendly_name="test",  
  description="This is a test description",  
  location="europe-west2",  
  default_table_expiration_ms=3600000,  
  labels={  
  "env": "default",  
  },  
  accesses=[  
    gcp.bigquery.DatasetAccessArgs( role="OWNER",  
    user_by_email=bqowner.email,  
    ),  
   ])  
  
 pulumi.export("jira_dataset", dataset.id)  
 pulumi.export("jira_servce_account", bqowner.id)  
 return {"dataset": dataset}

Next let’s define the Airbyte resources in a file names resources.py

import pulumi  
from provider.airbyte_provider import Source, Destination, Connection, JiraSource  
from dotenv import load_dotenv  
from gcp_resources import gcp_resources  
  
load_dotenv()  
  
  
def pulumi_execution(gcp_resources):  
 config = pulumi.Config("pulumi_airbyte")  
 dataset = gcp_resources["dataset"]  
 workspace_id = config.require('airbyte_workspace'),  
  # Jira Source configuration  
  jira_source_config = {"workspaceId": workspace_id, "name": "Personal Jira 8",  
  "configuration": {"sourceType": "jira",  
  "api_token": config.require_secret('jiraApiToken'),  
  "domain": config.require_secret('jira_domain'),  
  "email": config.require_secret('jira_email')}}  
  
  # BigQuery Destination configuration  
  bigquery_destination_config = {"workspaceId": workspace_id, "name": "BigQuery Destination 3xxx", "configuration": {  
  "destinationType": "bigquery",  
  "dataset_location": "europe-west2",  
  "project_id": config.require_secret('gcpProject'),  
  "dataset_id": dataset.dataset_id,  
  "credentials_json": config.require_secret('serviceAccountJson'),  
  "loading_method": {  
  "method": "Standard"  
  }  
 } }  
  # Create Jira Source  
  jira_source = Source("jiraSource", **jira_source_config)  
  
  # Create BigQuery Destination  
  bigquery_destination = Destination("bigQueryDestination", **bigquery_destination_config)  
  
  # Create Connection linking Jira Source and BigQuery Destination  
  connection_config = {  
  "name": "The Connection 5",  
  "sourceId": jira_source.id,  
  "destinationId": bigquery_destination.id,  
  "schedule": {"scheduleType": "manual"},  
  "namespaceDefinition": "destination",  
  "namespaceFormat": None,  
  "nonBreakingSchemaUpdatesBehavior": "ignore",  
  "configurations": {"streams": [  
 {  "syncMode": "full_refresh_overwrite",  
  "name": "issues"  
  }  
 ]} }  
 connection = Connection("jiraToBigQueryConnection2", **connection_config)  
 
  
 pulumi.export("jira_source_id", jira_source.id)  
 pulumi.export("bigquery_destination_id", bigquery_destination.id)  
 pulumi.export("connection_id", connection.id)  
 pulumi.export("jira_source2_id", jira_source2.id)

A thing to note:

  • Look how the pulumi_execution function is receiving a gcp_resources as a parameter. This will be used to link the GCP resources with the Airbyte resources:
    • You can see in the Destination config the line "dataset_id": dataset.dataset_id, which basically does the linking.

We’ll now see how the 2 resources files are linked together.

For standard execution, Pulumi will look for a __main__.py file. Let’s define it here:

from resources import pulumi_execution  
from gcp_resources import gcp_resources  
  
gcp_stuff = gcp_resources()  
pulumi_execution(gcp_stuff)

This is all the code needed. A couple of pieces to note in the code chunks are the use of:

config.require_secret('jiraApiToken')

and

config.require('airbyte_workspace')

These are generated using more pulumi cli commands. In particular:

 pulumi config set pulumi_airbyte:airbyte_workspace "xxxxxxx"

For normal non-secret configs and:

pulumi config set --secret pulumi_airbyte:jira_email "xxx@gmail.com"

For secret encrypted configurations.

Then we execute pulumi up to provision. (The following output is on my machine after I already created them, so you can see pulumi correctly says that there is nothing to change):

 pulumi up                                                                                                                                    21:29:35
Enter your passphrase to unlock config/secrets
    (set PULUMI_CONFIG_PASSPHRASE or PULUMI_CONFIG_PASSPHRASE_FILE to remember):  
Enter your passphrase to unlock config/secrets
Previewing update (dev2):
     Type                 Name                 Plan     Info
     pulumi:pulumi:Stack  pulumi_airbyte-dev2           1 warning

Diagnostics:
  pulumi:pulumi:Stack (pulumi_airbyte-dev2):
    warning: unable to detect a global setting for GCP Project.
    Pulumi will rely on per-resource settings for this operation.
    Set the GCP Project by using:
        `pulumi config set gcp:project <project>`

Resources:
    9 unchanged

Do you want to perform this update? yes
Updating (dev2):
     Type                 Name                 Status     Info
     pulumi:pulumi:Stack  pulumi_airbyte-dev2             1 warning

Diagnostics:
  pulumi:pulumi:Stack (pulumi_airbyte-dev2):
    warning: unable to detect a global setting for GCP Project.
    Pulumi will rely on per-resource settings for this operation.
    Set the GCP Project by using:
        `pulumi config set gcp:project <project>`

Outputs:
    bigquery_destination_id: "5381d0a9-9fbe-49f6-873a-416cf956b4f4"
    connection_id          : "343c3846-d88d-444f-8697-0586cb8e80b7"
    jira_dataset           : "projects/[secret]/datasets/airbyte_pulumi_2"
    jira_servce_account    : "projects/[secret]/serviceAccounts/bqowner2@[secret].iam.gserviceaccount.com"
    jira_source_id         : "42bd42fc-9e74-4854-a099-6523bda38df1"

Resources:
    9 unchanged

Duration: 1s

Here in the output you can also see the result of all the pulumi export that are in the code in the Outputs section of the output.

The full example is here. Feel free to copy, clone, etc.

Comments