5
5
from IPython import get_ipython
6
6
from IPython .display import *
7
7
from kubernetes import client , config
8
+ import requests
9
+ import logging
8
10
9
11
environment = os .getenv ('ENVIRONMENT' , 'development' ) # Default to 'development' if not set
10
12
13
+ logger = logging .getLogger (__name__ )
14
+ logger .setLevel (logging .INFO )
15
+
11
16
# Set the environment variables
12
17
def set_env ():
13
18
# kubernetes_host = os.environ.get('KUBERNETES_SERVICE_HOST')
@@ -27,56 +32,57 @@ def set_env():
27
32
28
33
# Create a Spark session
29
34
# def create_spark(app_name, master_url):
30
- spark = SparkSession .builder \
31
- .appName (app_name ) \
32
- .master (kubernetes_url ) \
33
- .config ("spark.submit.deployMode" , "client" ) \
34
- .config ("spark.driver.host" , driver_host ) \
35
- .config ("spark.driver.cores" , "1" ) \
36
- .config ("spark.driver.memory" , "1g" ) \
37
- .config ("spark.executor.instances" , "1" ) \
38
- .config ("spark.executor.cores" , "1" ) \
39
- .config ("spark.executor.memory" , "1g" ) \
40
- .config ("spark.kubernetes.namespace" , namespace ) \
41
- .config ("spark.kubernetes.container.image" , executor_image ) \
42
- .config ("spark.kubernetes.authenticate.driver.serviceAccountName" , service_account ) \
43
- .config ("spark.kubernetes.authenticate.executor.serviceAccountName" , service_account ) \
44
- .config ("spark.eventLog.enabled" , "true" ) \
45
- .config ("spark.eventLog.dir" , f"gs://{ bucket_name } /event-logs/" ) \
46
- .config ("spark.history.fs.logDirectory" , f"gs://{ bucket_name } /event-logs/" ) \
47
- .config ("spark.hadoop.fs.gs.impl" , "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem" ) \
48
- .config ("spark.hadoop.fs.AbstractFileSystem.gs.impl" , "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS" ) \
49
- .config ("spark.hadoop.fs.gs.auth.service.account.enable" , "true" ) \
50
- .getOrCreate ()
35
+ # spark = SparkSession.builder \
36
+ # .appName(app_name) \
37
+ # .master(kubernetes_url) \
38
+ # .config("spark.submit.deployMode", "client") \
39
+ # .config("spark.driver.host", driver_host) \
40
+ # .config("spark.driver.cores", "1") \
41
+ # .config("spark.driver.memory", "1g") \
42
+ # .config("spark.executor.instances", "1") \
43
+ # .config("spark.executor.cores", "1") \
44
+ # .config("spark.executor.memory", "1g") \
45
+ # .config("spark.kubernetes.namespace", namespace) \
46
+ # .config("spark.kubernetes.container.image", executor_image) \
47
+ # .config("spark.kubernetes.authenticate.driver.serviceAccountName", service_account) \
48
+ # .config("spark.kubernetes.authenticate.executor.serviceAccountName", service_account) \
49
+ # .config("spark.eventLog.enabled", "true") \
50
+ # .config("spark.eventLog.dir", f"gs://{bucket_name}/event-logs/") \
51
+ # .config("spark.history.fs.logDirectory", f"gs://{bucket_name}/event-logs/") \
52
+ # .config("spark.hadoop.fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") \
53
+ # .config("spark.hadoop.fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS") \
54
+ # .config("spark.hadoop.fs.gs.auth.service.account.enable", "true") \
55
+ # .getOrCreate()
51
56
52
- return spark
57
+ # return spark
53
58
54
59
# def start():
55
- # Configuring the API client
56
- config .load_incluster_config ()
60
+ # # Configuring the API client
61
+ # config.load_incluster_config()
57
62
58
- # Creating an API instance to interact with the K8s service
59
- v1 = client .CoreV1Api ()
63
+ # # Creating an API instance to interact with the K8s service
64
+ # v1 = client.CoreV1Api()
60
65
61
- # Fetching the service details
62
- service_name = os .environ .get ("WEBUI_SERVICE_NAME" , "notebook-spark-ui" )
63
- service = v1 .read_namespaced_service (service_name , namespace )
66
+ # # Fetching the service details
67
+ # service_name = os.environ.get("WEBUI_SERVICE_NAME", "notebook-spark-ui")
68
+ # service = v1.read_namespaced_service(service_name, namespace)
64
69
65
- webui_host = service .status .load_balancer .ingress [0 ].ip
66
- webui_port = spark .sparkContext .uiWebUrl .split (":" )[- 1 ]
67
- webui_url = f"http://{ webui_host } :{ webui_port } "
70
+ # webui_host = service.status.load_balancer.ingress[0].ip
71
+ # webui_port = spark.sparkContext.uiWebUrl.split(":")[-1]
72
+ # webui_url = f"http://{webui_host}:{webui_port}"
68
73
69
- msg = f"**App name**: { app_name } \n \n " + \
70
- f"**Master**: { kubernetes_url } \n \n " + \
71
- f"**Driver host**: { driver_host } \n \n " + \
72
- f"**Spark UI**: { webui_url } "
74
+ # msg = f"**App name**: {app_name}\n\n" + \
75
+ # f"**Master**: {kubernetes_url}\n\n" + \
76
+ # f"**Driver host**: {driver_host}\n\n" + \
77
+ # f"**Spark UI**: {webui_url}"
73
78
74
- display (Markdown (msg ))
79
+ # display(Markdown(msg))
75
80
76
81
class PawMarkSparkSession :
77
82
78
- def __init__ (self , spark_session ):
83
+ def __init__ (self , config_json , spark_session ):
79
84
self ._spark_session = spark_session
85
+ self ._config_json = config_json
80
86
self .history_server_base_url = "http://localhost:18080"
81
87
82
88
def __getattr__ (self , name ):
@@ -94,26 +100,36 @@ def _repr_html_(self):
94
100
return f"""
95
101
<div style="border: 1px solid #e8e8e8; padding: 10px;">
96
102
<h3>Spark Session Information</h3>
103
+ <p><strong>Config:</strong> { self ._config_json } </p>
97
104
<p><strong>Application ID:</strong> { application_id } </p>
98
105
<p><strong>Spark UI:</strong> <a href="{ spark_ui_link } ">{ spark_ui_link } </a></p>
99
106
</div>
100
107
"""
101
108
102
109
def create_spark_dev ():
103
- spark = PawMarkSparkSession (SparkSession .builder \
104
- .appName ("PySpark Example" ) \
105
- .master ("spark://spark-master:7077" ) \
106
- .config ("spark.jars.packages" , "io.delta:delta-spark_2.12:3.0.0" ) \
107
- .config ("spark.sql.extensions" , "io.delta.sql.DeltaSparkSessionExtension" ) \
108
- .config ("spark.sql.catalog.spark_catalog" , "org.apache.spark.sql.delta.catalog.DeltaCatalog" ) \
109
- .config ("spark.eventLog.enabled" , "true" ) \
110
- .config ("spark.eventLog.dir" , "/opt/data/spark-events" ) \
111
- .config ("spark.history.fs.logDirectory" , "/opt/data/spark-events" ) \
112
- .config ("spark.sql.warehouse.dir" , "/opt/data/spark-warehouse" ) \
113
- .config ("executor.memory" , "1g" ) \
114
- .config ("executor.cores" , "1" ) \
115
- .config ("spark.executor.instances" , "1" ) \
116
- .getOrCreate ())
110
+ logger .info ("Creating Spark session" )
111
+ try :
112
+ config_json = requests .get ("http://server:5002/spark_app/config" ).json ()
113
+ except Exception as e :
114
+ config_json = 'Error loading config: ' + str (e )
115
+
116
+ spark = PawMarkSparkSession (
117
+ config_json ,
118
+ SparkSession .builder \
119
+ .appName ("PySpark Example" ) \
120
+ .master ("spark://spark-master:7077" ) \
121
+ .config ("spark.jars.packages" , "io.delta:delta-spark_2.12:3.0.0" ) \
122
+ .config ("spark.sql.extensions" , "io.delta.sql.DeltaSparkSessionExtension" ) \
123
+ .config ("spark.sql.catalog.spark_catalog" , "org.apache.spark.sql.delta.catalog.DeltaCatalog" ) \
124
+ .config ("spark.eventLog.enabled" , "true" ) \
125
+ .config ("spark.eventLog.dir" , "/opt/data/spark-events" ) \
126
+ .config ("spark.history.fs.logDirectory" , "/opt/data/spark-events" ) \
127
+ .config ("spark.sql.warehouse.dir" , "/opt/data/spark-warehouse" ) \
128
+ .config ("executor.memory" , config_json ['executor.memory' ]) \
129
+ .config ("executor.cores" , config_json ['executor.cores' ]) \
130
+ .config ("spark.executor.instances" , config_json ['spark.executor.instances' ]) \
131
+ .getOrCreate ()
132
+ )
117
133
118
134
return spark
119
135
0 commit comments