Skip to content

Using The Extensible API

Intro

Note

This page explains the Extensible Data Skipping framework API.
See the Concepts page for an explanation about the underlying concepts.
See here for the list of currently available plugins.

Xskipper supports adding your index types and specifying your own data skipping logic in order to enjoy data skipping over UDFs and a variety of data types.

The pluggability is split between two main areas:

A new plugin can contain one or more of the above implementations.

This architecture enables the registration of components using multiple packages.

Using Existing Plugins

To use a plugin, load the relevant implementations using the Registration module (Scala, Python).
For example:

from xskipper import Registration

Registration.addMetadataFilterFactory(spark, 'io.xskipper.plugins.regex.filter.RegexValueListMetaDataFilterFactory')
# Add IndexFactory
Registration.addIndexFactory(spark, 'io.xskipper.plugins.regex.index.RegexIndexFactory')
# Add MetaDataTranslator
Registration.addMetaDataTranslator(spark, 'io.xskipper.plugins.regex.parquet.RegexValueListMetaDataTranslator')
# Add ClauseTranslator
Registration.addClauseTranslator(spark, 'io.xskipper.plugins.regex.parquet.RegexValueListClauseTranslator')
import io.xskipper._
import io.xskipper.plugins.regex.filter.RegexValueListMetaDataFilterFactory
import io.xskipper.plugins.regex.index.RegexIndexFactory
import io.xskipper.plugins.regex.parquet.{RegexValueListClauseTranslator, RegexValueListMetaDataTranslator}

// registering the filter factories for user metadataFilters
Registration.addIndexFactory(RegexIndexFactory)
Registration.addMetadataFilterFactory(RegexValueListMetaDataFilterFactory)
Registration.addClauseTranslator(RegexValueListClauseTranslator)
Registration.addMetaDataTranslator(RegexValueListMetaDataTranslator)

For the full list of plugins see here.


Note

When registering multiple plugins the order of registration for IndexFactory, MetadataTranslator, and ClauseTranslator matters. If two plugins define relevant translations or index creation for the same parameters the first one registered will be used.
In general, you should avoid having multiple plugins that behave differently for the same indexes, metadata types or clauses.

In the following sections we explain how to use the above interfaces in order to create a new plugin.
The explanations will use examples from the sample plugin - xskipper-regex-plugin.

The regex plugin enables indexing a text column by specifying a list of patterns and saving the matching substrings as a value list.

For example, consider an application log dataset and one of its objects :

application_name,log_line
batch job,20/12/29 18:04:39 INFO FileSourceStrategy: Pruning directories with:
batch job,20/12/29 18:04:40 INFO DAGScheduler: ResultStage 22 (collect at ParquetMetadataHandle.scala:324) finished in 0.011 s

and the regex pattern ".* .* .* (.*): .*".

When we index using the regex index, the metadata that will be saved is List("FileSourceStrategy", "DAGScheduler").

The following query will benefit from this index and will skip the above object:

SELECT * 
FROM tbl 
WHERE 
regexp_extract(log_line, '.* .* .* (.*): .*', 1) = 'MemoryStore'

Indexing Flow

Define the abstract metadata

Implementation(s) of MetaDataType

First you need to define the abstract metadata type that will be generated by the index. This type will hold the metadata in memory.
For example, the MinMax index metadata type is a tuple of min and max values (see here)

For the Regex plugin, to store the unique list of matching substrings for a given pattern we use a HashSet of Strings (see here).

Define a new index

Implementation(s) of Index along with IndexFactory

Support for new indexes can be achieved by implementing a new class that implements the Index abstract class.
A new index can use an existing MetaDataType or create its own MetaDataType along with a translation specification to the relevant metadatastore.

The Index interface enables specifying one of two ways to collect the metadata:

  • Tree Reduce - in this code path the index processes the object row by row and updates its internal state to reflect the update to the metadata. This mode enables running index creation in parallel for multiple indexes.

  • Optimized - using this interface the index processes the entire object DataFrame and generates the metadata.

For example, both the MinMaxIndex and the RegexValueListIndex. use the Tree Reduce mode to accumlate the list of unique matches.

Along with the Index you need to define an IndexFactory - the IndexFactory specifies how to recreate the index instance when loading the index parameters from the metadatastore. For example, see RegexIndexFactory.

Define translation for the metadata

Implementation(s) of MetaDataTranslator

Info

Xskipper uses by default Parquet as the metadatastore.
The Parquet metadatastore stores the metadata for the objects as rows in parquet files (for more details see here).
The API enables defining your own metadatastore. Here we focus on storing the metadata in the Parquet metadatastore. Therefore, the translations we cover here relate to the Parquet metadatastore.

In order to store the abstract metadata defined above in the metadata store we have to specify a suitable translation which will map it to a valid representation for the metadatastore.

For the Parquet metadatastore we have two options:

  • Convert the metadata to an internal Spark Row which will later be saved to Parquet automatically, as Spark supports Parquet out of the box. For example, the MinMax index translates its values to a nested row with min and max values (see here)

  • Use a UDT to save the serialized abstract metadata. In some cases translating the metadata to Spark Row is not possible, therefore we save the metadata as a serialized binary.
    To do so you have 2 options:

    • Use the default java serialization provided by the parquet metadata store. To do so you need to define the UDT and register it. For example, for bloom filter, we use the following definition to get the default java serialization:
    class BloomFilterMetaDataTypeUDT extends MetadataTypeUDT[BloomFilterMetaData]
    

    Then register the UDT to Spark using the ParquetMetadataStoreUDTRegistration object:

    ParquetMetadataStoreUDTRegistration.registerUDT(classOf[BloomFilterMetaData].getName, classOf[BloomFilterMetaDataTypeUDT].getName)
    

    Note that the bloom filter has the above definition built in so there is no need to register it.

    Note

    The UDT must be defined and registered in any program that uses the index.
    A recommended pattern is to define and register the UDT in the Clause Translator object where you also define the Clause Translation logic.
    This object will be loaded when registering the Clause Translator.

    • Define your own UDT with custom serialization logic - similar to the above only this time you implement your own UDT.
      See the MetadataTypeUDT class for a reference.

For the regex plugin we use the first option and translate the list of values to an array of values for storing in Parquet format (see here).

Query Evaluation Flow

Define the abstract clause

Implementations of Clause

First, you need to define the abstract clause that will be created by the Filter.
The Clause specifices an abstract condition which was deduced from the query and should operate on the metadata in order to determine the relevant objects. Each Clause is then translated to an explicit implementation according to the metadatastore type.

For example, for the MinMax index we define a MinMaxClause which follows the logic that was presented here.

For the Regex Plugin we use a Clause which holds the required matching patterns from the query (see here).

Define a new filter

Implementation(s) of MetaDataFilter along with MetaDataFilterFactory

The filter processes the query tree and labels it with clauses. In most cases we would like to map expressions to clauses. Therefore, xskipper provides a basic implementation of a filter called BaseMetadataFilter which processes the query tree automatically for AND and OR operators, leaving the user to handle only the remaining expressions. Implementations which extend the BaseMetadataFilter need only specify how expressions are mapped to clauses.
For example, RegexValueListFilter and MinMaxFilter map the query expressions according to the logic presented here.

A more advanced filter can process the entire tree by implementing the MetaDataFilter class without using the BaseMetadataFilter.

Along with a Filter you need to define a MetadataFilterFactory. The MetadataFilterFactory specifies which filters should run given the available indexes. For example, see the RegexIndexFactory.

Define translation for the clause

Implementations of ClauseTranslator

Info

Xskipper uses Parquet as the metadatastore by default.
The Parquet metadatastore stores the metadata for the objects as rows in parquet files (for more details see here).
Spark is used as the engine to run the abstract clauses on the metadata.
The API enables defining your own metadatastore. Here we focus on the metadata in the Parquet metadatastore. Therefore, the translations are relevant to the Parquet metadatastore.

In order to process a clause, the abstract clause defined above needs to be translated to a form that is executable by the metadatastore.

For the Parquet metadatastore we have 2 options:

  • Translate the Clause to a native Spark operation - this is useful when you have a built-in expression in Spark that can process the metadata. For example, for the MinMax index we use Spark’s built-in inequality operators (>, <, >=, <=) to translate the abstract clause (see here).

  • Use a UDF that will process the metadata - this is useful when the metadata is saved by serializing the abstract metadata type or when there is no built-in operation that implements the logic needed in order to process the metadata.
    For example, for the BloomFilter index which we serializes its metadata, we use a UDF to check whether the given value exists in the metadata or not (see here).

For the Regex Plugin we translate the clause to use Spark's arrays_overlap and array_except functions in order to check if the values in the clause exist in the metadata (see here).