1
2"""
3SELECT for BigQuery — automated onboarding script.
4
5Idempotent. Re-running is safe; resources are created/granted only when
6missing.
7
8Prerequisites:
9 1. Your billing export is ALREADY configured in the Cloud Console for the
10 dataset named below. Without an existing dataset we can't grant access
11 to it. See:
12 https://cloud.google.com/billing/docs/how-to/export-data-bigquery-setup
13 2. You're authenticated as a principal that has, at minimum:
14 - Service Usage Admin on PROJECT_ID
15 - Project IAM Admin on PROJECT_ID
16 - Organization Admin (or a role with resourcemanager.organizations.
17 setIamPolicy) on ORG_ID
18 Run once locally:
19 gcloud auth application-default login
20
21Install:
22 pip install google-cloud-bigquery google-auth requests
23
24Run:
25 Edit the CONFIG block below, then:
26 python setup-select.py
27"""
28
29
30
31
32PROJECT_ID = "customer-billing-project"
33ORG_ID = "123456789012"
34
35
36BILLING_DATASET = "billing_export"
37SA_NAME = "select-viewer"
38
39REGION = "region-us"
40
41
42
43import json
44import sys
45import time
46from datetime import datetime
47
48import google.auth
49import google.auth.transport.requests
50import requests
51from google.api_core import exceptions as gax_exc
52from google.auth import impersonated_credentials
53from google.cloud import bigquery
54
55
57
58REQUIRED_APIS = [
59 "bigquery.googleapis.com",
60 "iam.googleapis.com",
61 "iamcredentials.googleapis.com",
62 "cloudresourcemanager.googleapis.com",
63 "serviceusage.googleapis.com",
64]
65
66PROJECT_ROLES = [
67 "roles/bigquery.jobUser",
68 "roles/bigquery.readSessionUser",
69]
70
71ORG_ROLES = [
72 "roles/bigquery.resourceViewer",
73 "roles/bigquery.metadataViewer",
74 "roles/browser",
75]
76
77
78
79
80def step(msg: str) -> None:
81 print(f"\n==> {msg}")
82
83
84def ok(msg: str) -> None:
85 print(f" [OK] {msg}")
86
87
88def warn(msg: str) -> None:
89 print(f" [!] {msg}")
90
91
92def fail(msg: str) -> None:
93 print(f" [X] {msg}")
94
95
96
97
98_CREDS = None
99_AUTH_REQ = google.auth.transport.requests.Request()
100
101
102def _creds():
103 global _CREDS
104 if _CREDS is None:
105 _CREDS, _ = google.auth.default(
106 scopes=["https://www.googleapis.com/auth/cloud-platform"],
107 )
108 if not _CREDS.valid:
109 _CREDS.refresh(_AUTH_REQ)
110 return _CREDS
111
112
113def gcp_api(method: str, url: str, body: dict | None = None,
114 allow: tuple[int, ...] = ()) -> dict | None:
115 """REST call against any GCP JSON API. Returns parsed JSON (or None for
116 no-content). `allow` swallows specific HTTP statuses (e.g. 409 ALREADY
117 EXISTS) and returns None for them — used for idempotency."""
118 headers = {
119 "Authorization": f"Bearer {_creds().token}",
120 "Content-Type": "application/json",
121 }
122 resp = requests.request(method, url, headers=headers,
123 data=json.dumps(body) if body is not None else None,
124 timeout=60)
125 if resp.status_code in allow:
126 return None
127 if resp.status_code >= 400:
128 raise RuntimeError(
129 f"{method} {url} -> HTTP {resp.status_code}\n{resp.text}"
130 )
131 return resp.json() if resp.content else None
132
133
134
135
136def parse_billing_dataset() -> tuple[str, str]:
137 if "." in BILLING_DATASET:
138 proj, ds = BILLING_DATASET.split(".", 1)
139 return proj, ds
140 return PROJECT_ID, BILLING_DATASET
141
142
143
144
145def enable_apis() -> None:
146 step("Step 1/6 — enable required APIs")
147 body = {"serviceIds": REQUIRED_APIS}
148 op = gcp_api(
149 "POST",
150 f"https://serviceusage.googleapis.com/v1/projects/{PROJECT_ID}/services:batchEnable",
151 body=body,
152 )
153
154 op_name = op.get("name") if op else None
155 if op_name and not op.get("done"):
156 for _ in range(30):
157 time.sleep(2)
158 resp = gcp_api("GET", f"https://serviceusage.googleapis.com/v1/{op_name}")
159 if resp and resp.get("done"):
160 if "error" in resp:
161 fail(f"batchEnable error: {resp['error']}")
162 return
163 break
164 for api in REQUIRED_APIS:
165 ok(api)
166
167
168
169
170
171
172 bp, _ = parse_billing_dataset()
173 if bp != PROJECT_ID:
174 try:
175 gcp_api(
176 "POST",
177 f"https://serviceusage.googleapis.com/v1/projects/{bp}/services:batchEnable",
178 body={"serviceIds": ["bigquery.googleapis.com"]},
179 )
180 ok(f"bigquery.googleapis.com on {bp} (billing dataset host)")
181 except RuntimeError as e:
182 if "403" in str(e) or "PERMISSION_DENIED" in str(e):
183 warn(f"can't enable APIs on {bp} (no permission). Assuming "
184 "bigquery.googleapis.com is already enabled there.")
185 else:
186 raise
187
188
189def sa_email() -> str:
190 return f"{SA_NAME}@{PROJECT_ID}.iam.gserviceaccount.com"
191
192
193def create_service_account() -> None:
194 step(f"Step 2/6 — service account {sa_email()}")
195 existing = gcp_api(
196 "GET",
197 f"https://iam.googleapis.com/v1/projects/{PROJECT_ID}/serviceAccounts/{sa_email()}",
198 allow=(404,),
199 )
200 if existing:
201 ok("already exists")
202 return
203 gcp_api(
204 "POST",
205 f"https://iam.googleapis.com/v1/projects/{PROJECT_ID}/serviceAccounts",
206 body={
207 "accountId": SA_NAME,
208 "serviceAccount": {"displayName": "SELECT replicator viewer"},
209 },
210
211 allow=(409,),
212 )
213 ok("created")
214
215
216def _add_iam_member(get_url: str, set_url: str, role: str, member: str) -> bool:
217 """Read-modify-write on a Cloud IAM policy. Returns True if the policy
218 was changed (member added), False if member was already present."""
219 policy = gcp_api("POST", get_url, body={}) or {}
220 bindings = policy.get("bindings", [])
221 for b in bindings:
222 if b.get("role") == role:
223 if member in b.get("members", []):
224 return False
225 b.setdefault("members", []).append(member)
226 break
227 else:
228 bindings.append({"role": role, "members": [member]})
229 policy["bindings"] = bindings
230 gcp_api("POST", set_url, body={"policy": policy})
231 return True
232
233
234def grant_project_iam() -> None:
235 step(f"Step 3/6 — project IAM on {PROJECT_ID}")
236 member = f"serviceAccount:{sa_email()}"
237 get_url = f"https://cloudresourcemanager.googleapis.com/v1/projects/{PROJECT_ID}:getIamPolicy"
238 set_url = f"https://cloudresourcemanager.googleapis.com/v1/projects/{PROJECT_ID}:setIamPolicy"
239 for role in PROJECT_ROLES:
240 changed = _add_iam_member(get_url, set_url, role, member)
241 ok(f"{role} {'granted' if changed else '(already present)'}")
242
243
244def grant_org_iam() -> None:
245 step(f"Step 4/6 — org IAM on organizations/{ORG_ID}")
246 member = f"serviceAccount:{sa_email()}"
247 get_url = f"https://cloudresourcemanager.googleapis.com/v3/organizations/{ORG_ID}:getIamPolicy"
248 set_url = f"https://cloudresourcemanager.googleapis.com/v3/organizations/{ORG_ID}:setIamPolicy"
249 for role in ORG_ROLES:
250 changed = _add_iam_member(get_url, set_url, role, member)
251 ok(f"{role} {'granted' if changed else '(already present)'}")
252
253
254def grant_dataset_iam() -> None:
255 bp, bd = parse_billing_dataset()
256 step(f"Step 5/6 — bigquery.dataViewer on {bp}:{bd}")
257 client = bigquery.Client(project=bp)
258 try:
259 ds = client.get_dataset(bigquery.DatasetReference(bp, bd))
260 except gax_exc.NotFound:
261 fail(f"Dataset {bp}:{bd} does not exist. Configure the billing export "
262 "in the Cloud Console first, then re-run.")
263 sys.exit(1)
264 already = any(
265 e.role == "READER" and e.entity_id == sa_email()
266 for e in ds.access_entries
267 )
268 if already:
269 ok("READER already granted")
270 return
271 ds.access_entries = list(ds.access_entries) + [
272 bigquery.AccessEntry(role="READER", entity_type="userByEmail",
273 entity_id=sa_email()),
274 ]
275 client.update_dataset(ds, ["access_entries"])
276 ok("READER granted")
277
278
279def grant_token_creator() -> None:
280 step(f"Step 6/6 — allow {SELECT_BACKEND_SA} to impersonate {sa_email()}")
281 get_url = f"https://iam.googleapis.com/v1/projects/{PROJECT_ID}/serviceAccounts/{sa_email()}:getIamPolicy"
282 set_url = f"https://iam.googleapis.com/v1/projects/{PROJECT_ID}/serviceAccounts/{sa_email()}:setIamPolicy"
283 changed = _add_iam_member(get_url, set_url,
284 "roles/iam.serviceAccountTokenCreator",
285 f"serviceAccount:{SELECT_BACKEND_SA}")
286 ok(f"tokenCreator {'granted' if changed else '(already present)'}")
287
288
289
290
291def _impersonated_client() -> bigquery.Client:
292 src, _ = google.auth.default(
293 scopes=["https://www.googleapis.com/auth/cloud-platform"],
294 )
295 tgt = impersonated_credentials.Credentials(
296 source_credentials=src,
297 target_principal=sa_email(),
298 target_scopes=["https://www.googleapis.com/auth/cloud-platform"],
299 lifetime=600,
300 )
301 return bigquery.Client(credentials=tgt, project=PROJECT_ID)
302
303
304def _dry_run(client: bigquery.Client, label: str, sql: str) -> bool:
305 cfg = bigquery.QueryJobConfig(dry_run=True, use_query_cache=False)
306 try:
307 job = client.query(sql, job_config=cfg)
308 gb = (job.total_bytes_processed or 0) / 1e9
309 ok(f"{label} (dry-run, est. {gb:.3f} GB if executed)")
310 return True
311 except Exception as e:
312 fail(f"{label}: {e}")
313 return False
314
315
316def verify() -> bool:
317 step("Verification — dry-runs (free) + one cheap read on billing export")
318
319 waits = [0, 15, 30, 30]
320 client = _impersonated_client()
321 for attempt, delay in enumerate(waits, start=1):
322 if delay:
323 print(f" retrying impersonation in {delay}s "
324 f"(attempt {attempt}/{len(waits)})...")
325 time.sleep(delay)
326 try:
327 client.query("SELECT 1").result()
328 ok(f"impersonation works (attempt {attempt})")
329 break
330 except Exception as e:
331 last_err = e
332 else:
333 fail(f"impersonation failed: {last_err!r}")
334 return False
335
336 all_ok = True
337
338 all_ok &= _dry_run(
339 client, "INFORMATION_SCHEMA.JOBS_BY_ORGANIZATION",
340 f"SELECT job_id FROM `{REGION}`.INFORMATION_SCHEMA.JOBS_BY_ORGANIZATION "
341 f"WHERE creation_time >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 MINUTE)",
342 )
343 all_ok &= _dry_run(
344 client, "INFORMATION_SCHEMA.TABLE_STORAGE_BY_ORGANIZATION",
345 f"SELECT table_schema FROM "
346 f"`{REGION}`.INFORMATION_SCHEMA.TABLE_STORAGE_BY_ORGANIZATION",
347 )
348 all_ok &= _dry_run(
349 client, f"INFORMATION_SCHEMA.SCHEMATA ({PROJECT_ID})",
350 f"SELECT schema_name FROM "
351 f"`{PROJECT_ID}.{REGION}`.INFORMATION_SCHEMA.SCHEMATA",
352 )
353
354
355
356
357
358 bp, bd = parse_billing_dataset()
359 try:
360 tables = list(client.list_tables(f"{bp}.{bd}"))
361 except Exception as e:
362 fail(f"list tables in {bp}:{bd}: {e}")
363 return False
364 billing = next((t for t in tables
365 if t.table_id.startswith("gcp_billing_export_resource_v1_")),
366 None)
367 if billing is None:
368 warn(f"no gcp_billing_export_resource_v1_* table in {bp}:{bd} yet — "
369 "the first export can take up to 24h. Re-run verify later.")
370 return all_ok
371
372 sql = (
373 f"SELECT cost FROM `{bp}.{bd}.{billing.table_id}` "
374 f"WHERE DATE(_PARTITIONTIME) >= DATE_SUB(CURRENT_DATE(), INTERVAL 2 DAY) "
375 f"LIMIT 1"
376 )
377 try:
378 rows = list(client.query(sql).result(max_results=1))
379 ok(f"read {len(rows)} row(s) from {billing.table_id} "
380 f"(partition-pruned to last 2 days)")
381 except Exception as e:
382 fail(f"billing read: {e}")
383 all_ok = False
384 return all_ok
385
386
387
388
389def main() -> None:
390 if PROJECT_ID in ("customer-billing-project",) or ORG_ID == "123456789012":
391 print("Edit the CONFIG block at the top of this script first.",
392 file=sys.stderr)
393 sys.exit(2)
394
395 bp, bd = parse_billing_dataset()
396 print("SELECT for BigQuery — onboarding")
397 print(f" PROJECT_ID: {PROJECT_ID}")
398 print(f" ORG_ID: {ORG_ID}")
399 print(f" BILLING_DATASET: {bp}:{bd}")
400 print(f" SERVICE_ACCOUNT: {sa_email()}")
401 print(f" REGION: {REGION}")
402
403 try:
404 enable_apis()
405 create_service_account()
406 grant_project_iam()
407 grant_org_iam()
408 grant_dataset_iam()
409 grant_token_creator()
410 except Exception as e:
411 fail(f"setup failed: {e}")
412 sys.exit(1)
413
414 verify_ok = verify()
415
416 print()
417 print("============================================================")
418 print(" Paste these into select.dev → Settings → BigQuery → Add Account:")
419 print("============================================================")
420 print(f" Organization ID: {ORG_ID}")
421 print(f" Billing export project ID: {bp}")
422 print(f" Billing export dataset name: {bd}")
423 print(f" Service account email: {sa_email()}")
424 print("============================================================")
425 if not verify_ok:
426 print("\nNote: one or more verification checks failed above. IAM grants")
427 print("can take a couple minutes to propagate; if the failures look")
428 print("transient, re-run the verify step.")
429 sys.exit(1)
430
431
432if __name__ == "__main__":
433 main()