Use Cases
In Databricks, Unity Catalog Use Case and Non-Unity Catalog Use Case refers to how data governance, access control, and cataloging of data assets are managed within the Databricks environment.
Non-Unity Catalog
Register Examples in Databricks Notebook
Character Data
Encryption
%scala
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.expressions.UserDefinedFunction
val ThalesencryptCharUDF: UserDefinedFunction = udf((data: String) => {
try {
example.ThalesDataBricksCRDPFPE.thales_crdp_udf(data,"encrypt","char")
} catch {
case e: Exception => null
}
})
spark.udf.register("ThalesencryptCharUDF", ThalesencryptCharUDF)
// Do not need if using UC. spark.sql("CREATE or replace FUNCTION
ThalesencryptCharUDF AS 'example.ThalesencryptCharUDF'")
spark.sql("SELECT ThalesencryptCharUDF('thisisatest')").show()
Decryption
%scala
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.expressions.UserDefinedFunction
val ThalesdecryptCharUDF: UserDefinedFunction = udf((data: String) => {
try {
example.ThalesDataBricksCRDPFPE.thales_crdp_udf(data,"decrypt","char")
} catch {
case e: Exception => null
}
})
// Register the UDF
spark.udf.register("ThalesdecryptCharUDF", ThalesdecryptCharUDF)
// Do not need if using UC. spark.sql("CREATE or replace FUNCTION
ThalesdecryptCharUDF AS 'example.ThalesdecryptCharUDF'")
spark.sql("SELECT ThalesdecryptCharUDF('Ma9zhQvxnEX')").show()
Number Data
Encryption
%scala
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.expressions.UserDefinedFunction
val ThalesencryptNbrUDF: UserDefinedFunction = udf((data: String) => {
try {
example.ThalesDataBricksCRDPFPE.thales_crdp_udf(data,"encrypt","nbr")
} catch {
case e: Exception => null
}
})
spark.udf.register("ThalesencryptNbrUDF", ThalesencryptNbrUDF)
// Do not need if using UC. spark.sql("CREATE or replace FUNCTION ThalesencryptNbrUDF AS 'example.ThalesencryptNbrUDF'")
spark.sql("SELECT ThalesencryptNbrUDF('-46')").show()
Decryption
%scala
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.expressions.UserDefinedFunction
val ThalesdecryptNbrUDF: UserDefinedFunction = udf((data: String) => {
try {
example.ThalesDataBricksCRDPFPE.thales_crdp_udf(data,"decrypt","nbr")
} catch {
case e: Exception => null
}
})
spark.udf.register("ThalesdecryptNbrUDF", ThalesdecryptNbrUDF)
// Do not need if using UC. spark.sql("CREATE or replace FUNCTION ThalesdecryptNbrUDF AS 'example.ThalesdecryptNbrUDF'")
spark.sql("SELECT ThalesdecryptNbrUDF('914171854902')").show()
Unity Catalog
Register Examples in Databricks Notebook
Number Data
%scala
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{udf, explode, array}
import scala.collection.JavaConverters._
import example.ThalesDataBricksCRDPFPE
// Initialize Spark Session
val spark = SparkSession.builder()
.appName("Thales UDF Example")
.getOrCreate()
// Define the UDF
val ThalesDatabricksCRDPFPE = udf((data: String) => {
try {
val result = example.ThalesDataBricksCRDPFPE.thales_crdp_udf(data, "protect", "nbr")
Seq(result.toString)
} catch {
case e: Exception => Seq.empty[String]
}
})
// Register the UDF
spark.udf.register("ThalesDatabricksCRDPFPE", ThalesDatabricksCRDPFPE)
// Create a DataFrame with your reveal_data
val revealDF = spark.sql("SELECT c_custkey FROM my_catalog.my_schema.customer limit 5")
//val revealDF = spark.sql("SELECT c_custkey FROM samples.tpch.customer limit 5")
// Apply the UDF and explode the array result into rows
val encryptedDF = revealDF
.withColumn("encrypted_values", ThalesDatabricksCRDPFPE($"c_custkey"))
.select($"c_custkey", explode($"encrypted_values").as("encrypted_value"))
// Show the result
display(encryptedDF)
c_custkey encrypted_value
412445 oO2YCC
412446 aLyGcw
412447 SMXUv0
412448 nxXOx6
412449 Wp3Dse
Here is a sample query running in a databricks notepad cell using one of the UDFs. Please note this must run after the registration in the same session.
The above example is using an internal policy, which includes the CRDP metadata with the protected data.
When using a CRDP policy that is based on an external policy your results would be:
Examples
These examples work for both Unity Catalog and Non-Unity Catalog Use cases. The process to run python code is much simpler than java. All you need to do is create your python application in a Databricks notepad and run it.
There are two examples provided:
The first example uses the CRDP REST API to make calls for each data element.
The second example processes all data elements at once using the CRDP REST Bulk API.
All examples uses three parameters, data values, mode of operation (protect,reveal for the first example), (protectbulk,revealbulk) for the second example and the third parameter is the datatype (char or nbr).
Example 1 (protect, reveal)
import requests
import json
import decimal
import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import StringType
from pyspark.sql.functions import pandas_udf, lit, udf
from pyspark.sql.types import StringType
from decimal import Decimal, InvalidOperation
# Load properties from a configuration file
properties = {}
with open("udfConfig.properties") as prop_file:
for line in prop_file:
name, value = line.partition("=")[::2]
properties[name.strip()] = value.strip()
REVEALRETURNTAG = "data"
PROTECTRETURNTAG = "protected_data"
def thales_crdp_python_function(databricks_inputdata, mode, datatype):
encdata = ""
# Check for invalid data
if datatype.lower() != "char":
databricks_inputdata = str(databricks_inputdata)
# Check for invalid data
if databricks_inputdata is not None and databricks_inputdata.strip():
# Check if the length of the input is less than 2
if len(databricks_inputdata) < 2:
return databricks_inputdata
# Check if datatype is not 'char'
if datatype.lower() != "char":
lower_bound = -9
upper_bound = -1
try:
# Convert the string to an integer
number = Decimal(databricks_inputdata)
# Check if the number is between -1 and -9
if lower_bound <= number <= upper_bound:
print("The input is a negative number between -1 and -9.")
return databricks_inputdata
except ValueError:
print("The input is not a valid number.")
return databricks_inputdata
#else:
# Return input if it is None or empty
#return databricks_inputdata
# Fetch properties
crdpip = properties.get("CRDPIP")
if not crdpip:
raise ValueError("No CRDPIP found for UDF.")
return_ciphertext_for_user_without_key_access = (
properties.get("returnciphertextforuserwithnokeyaccess", "no").lower() == "yes"
)
user_set_lookup = properties.get("usersetlookup", "no").lower() == "yes"
key_metadata_location = properties.get("keymetadatalocation")
external_version_from_ext_source = properties.get("keymetadata")
protection_profile = properties.get("protection_profile")
#Print protection profile and key metadata location for debugging
#print("Protection Profile:", protection_profile)
#print("Key Metadata Location:", key_metadata_location)
data_key = "data"
if mode == "reveal":
data_key = "protected_data"
try:
json_tag_for_protect_reveal = (
PROTECTRETURNTAG if mode == "protect" else REVEALRETURNTAG
)
show_reveal_key = (
properties.get("showrevealinternalkey", "yes").lower() == "yes"
)
sensitive = databricks_inputdata
# Prepare payload for the protect/reveal request
crdp_payload = {
"protection_policy_name": protection_profile,
data_key: sensitive,
}
if mode == "reveal":
crdp_payload["username"] = "admin"
if key_metadata_location.lower() == "external":
crdp_payload["external_version"] = external_version_from_ext_source
# Construct URL and make the HTTP request
url_str = f"http://{crdpip}:8090/v1/{mode}"
headers = {"Content-Type": "application/json"}
response = requests.post(
url_str, headers=headers, data=json.dumps(crdp_payload)
)
response_json = response.json()
if response.ok:
protected_data = response_json.get(json_tag_for_protect_reveal)
if (
mode == "protect"
and key_metadata_location.lower() == "internal"
and not show_reveal_key
):
protected_data = (
protected_data[7:] if len(protected_data) > 7 else protected_data
)
encdata = protected_data
else:
raise ValueError(f"Request failed with status code: {response.status_code}")
except Exception as e:
print(f"Exception occurred: {e}")
if return_ciphertext_for_user_without_key_access:
pass
else:
raise e
return encdata
# Register the UDF
thales_crdp_python_udf = udf(thales_crdp_python_function, StringType())
spark.udf.register("thales_crdp_python_udf", thales_crdp_python_udf)
#Test
#Simple test
sensitive = "45444555"
singlevalue = thales_crdp_python_function(sensitive, "protect", "nbr")
print(f"sensitive: {sensitive}, Encrypted: {singlevalue}")
# Step 1: Fetch data using SQL query
query_result_bal = spark.sql(
"SELECT c_acctbal FROM samples.tpch.customer where c_acctbal > 1000 LIMIT 5"
)
query_result = spark.sql("SELECT c_name FROM samples.tpch.customer LIMIT 5")
# Step 2: Extract names into a list
names = [row.c_name for row in query_result.collect()]
bals = [row.c_acctbal for row in query_result_bal.collect()]
mode="protect"
key_metadata_location = properties.get("keymetadatalocation")
datatype = "nbr"
if mode == "protect":
if datatype == "char":
encrypted_names = [thales_crdp_python_function(name, "protect", "char") for name in names]
for original, encrypted in zip(names, encrypted_names):
print(f"Original: {original}, Encrypted: {encrypted}")
# Apply the UDF to the DataFrame
df_with_encrypted_names = query_result.withColumn("encrypted_name", thales_crdp_python_udf(query_result.c_name, lit("protect"), lit("char")))
# Show the results
df_with_encrypted_names.show()
else:
encrypted_bals = [thales_crdp_python_function(bal, "protect", "nbr") for bal in bals]
for original, encrypted in zip(bals, encrypted_bals):
print(f"Original: {original}, Encrypted: {encrypted}")
else:
if datatype == "char":
if (key_metadata_location == "external"):
reveal_data = ['INlGNUmQ#k6ooUjm2A', 'KAE09OFh#eiQhgLgfI', 'RsqD5rr9#B4spSHZvD', 'HXguI0J5#TVXNJgtS5', 'MIt7SMGA#iuikyynnl']
else:
reveal_data = ['1001000hhIQGjma#fH5yPPOda', '1001000rXhadmXl#XxRlFW0YU', '1001000BvDFNg8p#NbyCunqpT', '1001000w5U1O9iu#59Ulv2yA8', '1001000CzyGv0KB#mFf9pcKUY']
else:
if (key_metadata_location == "external"):
reveal_data = ['7080.44', '7382.75', '1971.05', '8044.16', '2361.92']
#reveal_data = ['41', '-53', '343075883524', '22262837758', '208755']
else:
reveal_data = ['10020007080.44', '10020007382.75', '10020001971.05', '10020008044.16', '10020002361.92']
# Call the function with the test data
reveal_results = [thales_crdp_python_function(protecteddata, mode, datatype) for protecteddata in reveal_data]
print("Results:", reveal_results)
Sample output
Original: Customer#000412445, Encrypted: INlGNUmQ#k6ooUjm2A
Original: Customer#000412446, Encrypted: KAE09OFh#eiQhgLgfI
Original: Customer#000412447, Encrypted: RsqD5rr9#B4spSHZvD
Original: Customer#000412448, Encrypted: HXguI0J5#TVXNJgtS5
Original: Customer#000412449, Encrypted: MIt7SMGA#iuikyynnl
Original: 5358.33, Encrypted: s3Lc.We
Original: 9441.59, Encrypted: bbx9.bw
Original: 7868.75, Encrypted: JYUW.ws
Original: 6060.98, Encrypted: DStY.ZQ
Original: 4973.84, Encrypted: abwo.vE
You can also register the function as well:
# Register the UDF
thales_crdp_python_udf = udf(thales_crdp_python_function, StringType()) spark.udf.register("thales_crdp_python_udf", thales_crdp_python_udf)
#!yaml
# Test the function
# Create a DataFrame
df = spark.sql("SELECT c_name FROM samples.tpch.customer LIMIT 50")
# Apply the UDF to the DataFrame
df_with_encrypted_names = df.withColumn("encrypted_name", thales_crdp_python_udf(df.c_name, lit("protect"), lit("char")))
# Show the results
df_with_encrypted_names.show()
And then reference it in other cells.
%scala
spark.sql("SELECT thales_crdp_python_udf(c_name, 'protect', 'char') AS encrypted_name FROM samples.tpch.customer LIMIT 50").show()
%sql
SELECT thales_crdp_python_udf(c_name, 'protect', 'char') AS encrypted_name FROM samples.tpch.customer LIMIT 50
Example 2 (protectbulk, revealbulk)
This example will make one call passing in the entire column to be processed.
%python
import requests
import json
from decimal import Decimal, InvalidOperation
# Load properties from a configuration file
properties = {}
with open("udfConfig.properties") as prop_file:
for line in prop_file:
name, value = line.partition("=")[::2]
properties[name.strip()] = value.strip()
def check_valid(databricks_inputdata, datatype):
encdata = ""
BADDATATAG = "99999999999"
# Check for invalid data
if databricks_inputdata is not None and databricks_inputdata.strip():
# Check if the length of the input is less than 2
if len(databricks_inputdata) < 2:
return BADDATATAG + databricks_inputdata
print
# Check if datatype is not 'char'
if datatype.lower() != "char":
lower_bound = -9
upper_bound = -1
try:
# Convert the string to an Decimal
number = Decimal(databricks_inputdata)
# Check if the number is between -1 and -9
if lower_bound <= number <= upper_bound:
#print("The input is a negative number between -1 and -9.")
return BADDATATAG
except ValueError:
#print("The input is not a valid number.")
return BADDATATAG
else:
# Return input if it is None or empty
return BADDATATAG
return databricks_inputdata
def prepare_reveal_input(protected_data, protection_policy_name, key_metadata_location, external_version=None):
# Base reveal payload structure
reveal_payload = {
"protection_policy_name": protection_policy_name,
"username": "admin",
"protected_data_array": []
}
# Add external version if key_metadata_location is 'external'
if key_metadata_location == "external" and external_version:
reveal_payload["protected_data_array"] = [
{"protected_data": data, "external_version": external_version} for data in protected_data
]
else:
reveal_payload["protected_data_array"] = [
{"protected_data": data} for data in protected_data
]
return reveal_payload
def thales_crdp_python_function_bulk(databricks_inputdata, mode, datatype):
encdata = []
# Convert input data to string if datatype is not 'char'
if datatype.lower() != "char":
databricks_inputdata = [check_valid(str(data),datatype) for data in databricks_inputdata]
print("databricks_inputdata", databricks_inputdata)
# Fetch properties
crdpip = properties.get("CRDPIP")
if not crdpip:
raise ValueError("No CRDPIP found for UDF.")
return_ciphertext_for_user_without_key_access = (
properties.get("returnciphertextforuserwithnokeyaccess", "no").lower() == "yes"
)
key_metadata_location = properties.get("keymetadatalocation")
external_version_from_ext_source = properties.get("keymetadata")
protection_profile = properties.get("protection_profile")
if mode == "protectbulk":
input_data_key_array = "data_array"
output_data_key_array = "protected_data_array"
output_element_key = "protected_data"
else:
input_data_key_array = "protected_data_array"
output_data_key_array = "data_array"
output_element_key = "data"
print("mode:", mode)
try:
show_reveal_key = (
properties.get("showrevealinternalkey", "yes").lower() == "yes"
)
# Prepare payload for bulk protect/reveal request
if mode == "protectbulk":
crdp_payload = {"protection_policy_name": protection_profile,input_data_key_array: databricks_inputdata}
else:
if key_metadata_location == "external":
crdp_payload = prepare_reveal_input(databricks_inputdata, protection_profile,key_metadata_location,external_version_from_ext_source)
else:
crdp_payload = prepare_reveal_input(databricks_inputdata, protection_profile,key_metadata_location)
if mode == "revealbulk":
crdp_payload["username"] = "admin"
# Construct URL and make the HTTP request
url_str = f"http://{crdpip}:8090/v1/{mode}"
headers = {"Content-Type": "application/json"}
print("Sending request to URL:", url_str)
data=json.dumps(crdp_payload)
print("Sending data:", data)
response = requests.post(
url_str, headers=headers, data=json.dumps(crdp_payload)
)
response_json = response.json()
if response.ok:
protected_data_array = response_json.get(output_data_key_array, [])
encdata = [item[output_element_key] for item in protected_data_array]
# Handle response for bulk data
if mode == "protectbulk" and key_metadata_location.lower() == "internal" and not show_reveal_key:
encdata = [
data[7:] if len(data) > 7 else data for data in encdata
]
else:
raise ValueError(f"Request failed with status code: {response.status_code}")
except Exception as e:
print(f"Exception occurred: {e}")
if return_ciphertext_for_user_without_key_access:
pass
else:
raise e
return encdata
# Step 1: Fetch data using SQL query
query_result_bal = spark.sql(
"SELECT c_acctbal FROM samples.tpch.customer where c_acctbal > 1000 LIMIT 5"
)
query_result = spark.sql("SELECT c_name FROM samples.tpch.customer LIMIT 5")
# Step 2: Extract names into a list
names = [row.c_name for row in query_result.collect()]
bals = [row.c_acctbal for row in query_result_bal.collect()]
mode="protectbulk"
key_metadata_location = properties.get("keymetadatalocation")
datatype = "nbr"
# Step 3: Make the call depending on what the mode and datatype is. Note you must also
#make sure the udfConfig.property file contains settings that are compatible for the test you
#are running.
if mode == "protectbulk":
if datatype == "char":
encrypted_names = thales_crdp_python_function_bulk(names, mode, datatype)
print("Encrypted Names:", encrypted_names)
else:
encrypted_bals = thales_crdp_python_function_bulk(bals, mode, datatype)
print("Encrypted Balance nbr:", encrypted_bals)
else:
if datatype == "char":
if (key_metadata_location == "external"):
reveal_data = ['INlGNUmQ#k6ooUjm2A', 'KAE09OFh#eiQhgLgfI', 'RsqD5rr9#B4spSHZvD', 'HXguI0J5#TVXNJgtS5', 'MIt7SMGA#iuikyynnl']
else:
reveal_data = ['1001000hhIQGjma#fH5yPPOda', '1001000rXhadmXl#XxRlFW0YU', '1001000BvDFNg8p#NbyCunqpT', '1001000w5U1O9iu#59Ulv2yA8', '1001000CzyGv0KB#mFf9pcKUY']
else:
if (key_metadata_location == "external"):
reveal_data = ['7080.44', '7382.75', '1971.05', '8044.16', '2361.92']
#reveal_data = ['41', '-53', '343075883524', '22262837758', '208755']
else:
reveal_data = ['10020007080.44', '10020007382.75', '10020001971.05', '10020008044.16', '10020002361.92']
# Call the bulk function with the test data
encrypted_test_data = thales_crdp_python_function_bulk(reveal_data, mode, datatype)
print("Results:", encrypted_test_data)