Photo by Shahadat Rahman on Unsplash

Welcome back to our series where we are sharing a step by step walkthrough of our data stack development process. If you have not already done so, we highly recommend checking out Part 1 of this series to familiarize yourself with the various components of the modern data stack, and to understand which components make the most sense for you and your business.

In Part Two of this series, we will be detailing how to configure your pipeline to deliver raw data to your warehouse most effectively. The pipeline is a critical component of your data stack, serving as the conduit through which raw data is ingested, processed, and stored in your data warehouse. A well-designed pipeline is essential, as it not only impacts cost management but also affects the ease of future maintenance and expansion of your data stack. Proper configuration ensures seamless data flows, reduces latency, and maintains data quality and accessibility — empowering your team to generate insights quickly and accurately! Ultimately, there are many suitable approaches for data pipeline development; what matters most is that you execute your strategy with an eye toward how your needs may grow in the future.

Initializing the Data Warehouse

Snowflake was our data warehouse of choice for our case study project.

Previously, we have outlined what goes into deciding between data warehouse providers and making the best choice for your stack. While there are a plethora of large cloud data warehouse platforms on the market, for our project we elected to use Snowflake. To create your Snowflake instance, simply register on their website with your email and organization details to start your free 30-day trial, which will seamlessly transfer over to a full account when the trial period ends. Once you have access, navigate over to the databases section and create a new database with a unique name. This will house the raw and production schemas for your stack!

One of the key benefits of Snowflake is that it separates compute and storage across the platform — allowing you to scale each facet as needed to best control the data warehouse and minimize costs throughout. To optimally employ this, we recommend creating at least two data warehouses that will separate the data manipulation processes in the staging layer from the final dashboard processes in the production layer. This way you can scale each warehouse separately to accommodate the compute power necessary for each facet without forcing ubiquitous compute or storage limitations where they are not needed. This also allows for ease of cost management. Another approach you can take is to create a separate warehouse for each core component of your data pipeline (e.g. a warehouse for use by an Airbyte or Fivetran, a warehouse for your transformation layer if you use dbt, a warehouse for your BI tool, etc.)

The next step is to actually create the warehouse. To do so, begin by selecting the ‘create warehouse’ button in the top right hand corner of the warehouses section of the admin panel. Here, set a unique name to avoid confusion between warehouses and select a warehouse size — this is what is going to most directly affect data costs.

As a quick aside on pricing, Snowflake bills based on the ‘size’ of the warehouse, meaning how much compute power is allocated. Thus, you should consider a slightly bigger warehouse for a compute heavy process (such as the ETL layer of the stack) while electing for a smaller warehouse for the production layer that will mostly be running simpler queries. Unless you are dealing with a ton of data right off the hop, it likely makes sense to start with an X-Small warehouse and scale up from there if your performance greatly suffers (and you require better performance for the trade off in cost).

The final step in optimizing your warehouse setup is to create users and assign appropriate privileges. The account you register with is designated as the ‘organization’ admin by default, but it’s essential to create unique users for each component of your stack — granting them access only to specific portions of your Snowflake ecosystem. Defining roles with tailored privileges on databases, schemas, and tables allows you to establish a clear user hierarchy and restrict access as needed. This approach ensures that users only access the data they require, reducing the risk of unauthorized access and enhancing security.

To create a user, navigate to the admin panel, select “Create New User,” and specify a unique name, email address (if it’s an actual user rather than an account for a specific stack component), and the roles and privileges to assign under “Advanced User Options.” At this stage, assign each user to the appropriate warehouse to effectively compartmentalize your stack while ensuring that all tools can access the necessary data. Be sure to document each user’s credentials, as they will be required when connecting the various components of your stack to the Snowflake ecosystem.

Configuring the Pipeline

Once you’ve set up your warehouse, the next step is to build the data pipeline to connect your various sources. As we mentioned in the previous part of our series, FiveTran and Stitch are excellent data integration tools with a plethora of out of the box connectors. However, for stacks like ours that connect to a unique API, AirByte’s ability to create custom connectors while still providing high quality pre-built connectors makes it an exceptional choice.

To create your pipeline, start by establishing connectors to your source (via an API) and destination (the data warehouse).

Using Pre-Built Connectors

If using pre-built connectors, the process for both is similar. In the Airbyte UI, click “New Destination” under the Destination tab, select the connector for your warehouse provider (e.g., Snowflake), and follow the attached walkthrough to enter your warehouse credentials, which can be found in the admin panel. Provide the host URL, the database and warehouse name for the staging layer, and a schema name for raw data storage (Airbyte will create the schema in Snowflake if it doesn’t exist). For authentication, use the Snowflake username and password you created for this purpose.

Sample authentication screen when setting Snowflake up as a destination in Airbyte.

Creating a Custom Connector

If you don’t see a pre-built connector for the source you have in mind, don’t fret! This is where AirByte shines.

While there are multiple ways to create a custom connector with AirByte, for this exercise we will focus on the AirByte Connector Builder tool. The first step here is to assign a name to your connector and designate the base API url along with the proper credentials (Key, OAuth, session token, etc.) under the global configuration tab. This will serve as the base for each stream (API endpoint) in this connector, so if there is data you need for your stack that falls under a different base API, then you need to create a separate connector for that API.

AirByte structures all its connectors into ‘streams’ which each are materialized as a table in your warehouse. For a custom connector, you have to configure a data stream for each endpoint of the API that you have deemed necessary.

To create a stream, press the plus button on the left tab of the builder and enter the desired endpoint along with a meaningful name for the stream — this will be the name of the table in the destination warehouse. Once you hit enter, AirByte will navigate you to the stream page, where you will define all the parameters and settings to access this endpoint. Also of note on this page is the testing tab on the right hand side — which allows you to pass default values and test the outputs of the stream.

Foremost, ensure that the http method is set to ‘get’ and that the response format matches what you intend (JSON being the ubiquitous default). From there, navigate through the various available settings we have detailed below to further configure the stream to ensure that the output will return what you expect.

Advanced Stream Configuration

Within Airbyte’s Connector Builder UI there are a plethora of tools at your disposal to perform various manipulations to the API calls and responses that are processed under the hood.

Firstly, the ‘Request Options’ section is comparable to the standards of loading metadata into the body of any http request regardless of situation. Here you can define variables headers or parameters as key value pairs to include in the body of the request, as is standard for http requests in any scope. Note that these variables get included with the request and not injected directly to the endpoint url.

Another useful tool, the ‘Record Selector’ section defines the methodology for extracting the data from the JSON (or any format) response. This field is useful when the records in the response are contained in a nested field, or if there is a filter you want to apply for extracting records. For the field path just enter the sequence of field names in the response that point to the records. For example, in our games stream we wanted to ignore the response metadata and only extract the data from the ‘games’ field as seen below. From there, you can apply a filter on the data by wrapping it in double curly braces {{ }}. For example, if we only wanted games that had an id greater than 150 that filter would look like: {{ games.id > 150 }}.

The ‘Transformations’ section interacts with the raw response, allowing you to add or remove fields from records. To manipulate a field, define its path as in the ‘Record Selector,’ specifying each field step-by-step. The * character can be used as a pseudo-RegEx to match fields with a certain prefix (e.g., s* matches all fields starting with “s”), while prefixing with ** matches all instances of a field at any level. To delete a field, simply define it. When adding a field, specify the value to assign—it can be a static string or integer, a variable from the stream, or data extracted from another part of the response. For example, you might remove all fields starting with “a,” “g,” or “h,” and add a date field populated by a stream variable.

The ‘Pagination’ section defines how the stream should handle endpoints that paginate their responses. Here you can define how the endpoint splits its response — either by offset, page, or cursor, depending on which keywords are used by the endpoint. To implement this, define the paging parameter and then how that parameter is to be injected into the request. If injecting into the query parameters or headers, simply reference the correct parameters previously declared. If you are injecting directly into the path, ensure that the path has a variable wrapped in double curly braces in the defined url endpoint for the stream.

The ‘Incremental Sync’ tool is ideal for endpoints with large data volumes — enabling you to pull only new or updated records since the previous sync. This is achieved by specifying a cursor field (in the JSON response), the datetime format, and a start and end date. These dates can be static or user-defined, with the end date optionally set to ‘Now’ (time of sync). If user input is chosen, default values can be tested by setting them in the left tab. Next, configure how the date cursor is included in the request. It can be injected directly into the API endpoint using {{stream_interval.start_time}} or added to the HTTP request body as a parameter or header. The example below shows how we configured our games stream to retrieve the NHL games played every day since the user defined start date. For more advanced tuning on the partitioning of the date, you can manipulate the cursor interval and granularity in the Advanced dropdown.

Much like the Incremental Sync, the ‘Parent Sync’ tool allows you to connect one of your other streams as an incremental input into the stream. This is incredibly useful when one of your endpoints relies on an ID or some other field that is returned from another endpoint. For example, our API has an endpoint that returns every game played in the NHL (seen above) with another endpoint that returns the detailed box score and play-by-play information for a given game id. Thus, we incorporated a parent stream into our box scores stream to incrementally pull the game IDs from our games endpoint and retrieve the box scores for each of them, as seen below. To implement this you must select the parent stream you wish to invoke and select the key (field) from that stream to inject to the current stream. Then, you must define how this parent key will be injected to the query, either through the query parameters and body by selecting that option, or directly nesting it in the endpoint URL path. To add it to the path, set a variable name in the Current Parent Key Value Identifier field and reference it in the url path using the {{ stream_paritions.variable_name }}syntax.

Now that you have set up all your desired streams and have thoroughly tested them with the Builder’s built-in testing capabilities for each stream, you can publish the connector and it’ll appear as an available source connector. Note that you can easily make changes to the connector at any time and the changes won’t propagate until you publish an updated version, which will automatically update to any connection that uses this connector.

Deploying the Pipeline

Now that you have successfully initialized a source and destination, you can create a sync pipeline in the connections tab of the AirByte UI. Here AirByte walks you through the process of selecting the existing source and destination that you have created. After AirByte has connected to the sources, select the streams that you want to send through to the warehouse and define the connection settings. This is the crux of your connection, so you will want to set a descriptive name for your connection and be sure to define the sync to run manually or on a schedule. When scheduling your sync it is important to know when your source is typically updated to have your schedule match accordingly. You can also further control your pipeline by choosing how your schema is defined (by source or destination schema) along with how it should respond to errors and whether it should notify you of any schema changes (breaking or not).

Conclusion

The data pipeline build might not be what most end users in your company ultimately see, but your data pipeline & warehousing set up provides massive leverage when working towards becoming a data-driven organization — in both directions. Done correctly, your Company will be well on its way to making more intelligent decisions than ever before, and you will empower your analysts to drive amazing insights on top of the foundation you’ve built.

About Us

South Shore Analytics (SSA) is an Analytics Consulting Firm, co-founded by James Burke and Nick Lisauskas, both highly skilled professionals with more than a decade of invaluable experience in the field of Analytics. Their shared passion for data-driven insights and business optimization led them to establish SSA, aiming to provide top-notch services to various businesses, irrespective of their size or stage of development.