How to write query result to Google Cloud Storage bucket directly?

12,446

Solution 1

Depending on your specific use case (frequency of the export, size of the exports, etc.), the solutions proposed in the answer by @GrahamPolley may work for you, although they would take more development and attention.

The current possibility for writing query results is either to write the results to a table or to download it locally, and even downloading directly to CSV has some limitations. Therefore, there is not the possibility to write query results to GCS in CSV format directly. However, there is a 2-steps solutions consisting in:

  1. Write query results to a BQ table
  2. Export data from a BQ table to a CSV file in GCS. Note that this feature has some limitations too, but they are not as narrow.

The following Python code can give you an idea of how to perform that task:

from google.cloud import bigquery
client = bigquery.Client()

# Write query results to a new table
job_config = bigquery.QueryJobConfig()
table_ref = client.dataset("DATASET").table("TABLE")
job_config.destination = table_ref
job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE

query_job = client.query(
    'SELECT name FROM `bigquery-public-data.usa_names.usa_1910_2013` LIMIT 10',
    location='US', # Location must match dataset
    job_config=job_config)
rows = list(query_job)  # Waits for the query to finish


# Export table to GCS
destination_uri = "gs://BUCKET/FILE.CSV"
dataset_ref = client.dataset("DATASET", project="PROJECT_ID")
table_ref = dataset_ref.table("TABLE")

extract_job = client.extract_table(
    table_ref,
    destination_uri,
    location='US')
extract_job.result()  # Waits for job to complete

Note that, after that, you would have to delete the table (you can also do that programatically). This may not be the best solution if you have to automatize the process (if that is your use case, maybe you should better explore @Graham's solutions), but it will do the trick for a simple scenario.

Solution 2

BigQuery does not support writing its query results directly to GCS. You will have to write the results to a table, and then export the table to GCS after it's been materialised. You could possibly use Cloud Composer to orchestrate this for you.

Or, you could use a Dataflow pipeline to achieve your desired result in one go. But this is a bit more work and will cost more money. The idea would be write a pipeline to read from BigQuery using your SQL query, and then write the results to GCS. It will also be slower though.

Solution 3

@dsesto 's answer was quite useful for me. I used his code and added some additional lines to query BigQuery, write result to the table, then export to GCS and import the result to Dask DataFrame. The code is wrapped into a function.

def df_from_bq(query:str,table=None,compute=False):

from time import gmtime, strftime
from google.cloud import bigquery#y, storage 
import dask.dataframe as dd
import gcsfs

client = bigquery.Client.from_service_account_json('YOUR_PATH') #Authentication if BQ using ServiceKey
project = 'YOUR_PROJECT'

table_name = 'result_'+str(strftime("%Y%m%d_%H%M%S", gmtime())) if table==None else table #Creates custome table name if no name is defined

job_config = bigquery.QueryJobConfig()
table_ref = client.dataset("YOUR_DATASET").table(table_name)
job_config.destination = table_ref
job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE #Creates the table with query result. Overwrites it if the table exists

query_job = client.query(
    query,
    location='US', 
    job_config=job_config)
query_job.result() 
print('Query results loaded to table {}'.format(table_ref.path))

destination_uri = "gs://YOUR_BUCKET/{}".format(table_name+'_*'+'.csv') 
dataset_ref = client.dataset("YOUR_DATASET", project=project)
table_ref = dataset_ref.table(table_name)

extract_job = client.extract_table(
    table_ref,
    destination_uri,
    location='US') 
extract_job.result() #Extracts results to the GCS

print('Query results extracted to GCS: {}'.format(destination_uri))

client.delete_table(table_ref) #Deletes table in BQ

print('Table {} deleted'.format(table_name))

gcs = gcsfs.GCSFileSystem(project=project, token='cache') 
df = dd.read_csv('gcs://YOUR_BUCKET/{}'.format(table_name+'_*'+'.csv'),  storage_options={'token': gcs.session.credentials})

#storage_client = storage.Client.from_service_account_json('C:\\Users\o.korshun\Documents\o.korshun.json')
#bucket = storage_client.get_bucket('plarium-analytics')
#blob = bucket.blob(table_name+'.csv')
#blob.delete() #Uncomment if you need to delete Blob after the DataFrame is created

#print('Blob {} deleted'.format(table_name+'.csv'))
print('Results imported to DD!')

return df if compute == False else df.compute().reset_index(in_place=True)

Note that Table in BQ is deleted after the result is imported to the Cloud Storage.

Solution 4

from google.cloud import bigquery
from google.oauth2 import service_account


credentials = service_account.Credentials.from_service_account_file("dev-key.json", scopes=["https://www.googleapis.com/auth/cloud-platform"],)
client = bigquery.Client(credentials=credentials, project=credentials.project_id,)


bq_export_to_gs = """
EXPORT DATA OPTIONS(
  uri='gs://my-bucket/logs/edo/dengg_audit/bq-demo/temp4/*',
  format='CSV',
  overwrite=true,
  header=false,
  field_delimiter='^') AS
select col1 , col2 from  `project.schema.table` where clientguid = '1234' limit 10

"""

query_job= client.query(bq_export_to_gs)
results = query_job.result()
for row in results:
    print(row)

Solution 5

Solution: BigQuery result to Google Cloud Storage bucket directly

from google.cloud import bigquery
from google.cloud import storage

def export_to_gcs():
    QUERY = "SELECT * FROM TABLE where CONDITION" # change the table and where condition
    bq_client = bigquery.Client()
    query_job = bq_client.query(QUERY) # BigQuery API request
    rows_df = query_job.result().to_dataframe()
    
    storage_client = storage.Client() # Storage API request
    bucket = storage_client.get_bucket(BUCKETNAME) # change the bucket name
    blob = bucket.blob('temp/Add_to_Cart.csv')
    blob.upload_from_string(rows_df.to_csv(sep=';',index=False,encoding='utf-8'),content_type='application/octet-stream')
    return "success"
Share:
12,446
bobby j
Author by

bobby j

Updated on June 16, 2022

Comments

  • bobby j
    bobby j almost 2 years
    from google.cloud import bigquery  
    query = """ select * from emp where emp_name=@emp_name""" 
    query_params = [bigquery.ScalarQueryParameter('emp_name', 'STRING', 'name')] 
    job_config = bigquery.QueryJobConfig() 
    job_config.query_parameters = query_params  
    client = bigquery.Client() 
    query_job = client.query(query, job_config=job_config) 
    result = query_job.result()
    

    How can I write the result to Google Cloud Storage instead of writing it to the CSV and uploading it to cloud storage bucket?

  • Ben P
    Ben P almost 6 years
    Good example, and this could also be automated on a compute engine, as a simpler approach to a full Dataflow pipeline.
  • bobby j
    bobby j almost 6 years
  • Ben P
    Ben P almost 6 years
    @bobbychowdary Yes, you could also use that code, as this post describes, as the second part of the job.
  • bobby j
    bobby j almost 6 years
    @dsesto while using your approach i am getting this error AttributeError: 'module' object has no attribute 'WriteDisposition'
  • dsesto
    dsesto almost 6 years
    @bobbychowdary I would say the code you shared will not work for this case. The code in that link is used to create a random file in GCS. The one I shared is used to specifically export BQ tables to GCS.
  • dsesto
    dsesto almost 6 years
    @bobbychowdary where are you running this code from? Which version of the google-cloud-bigquery library are you using? You can maybe try upgrading the library to the latest version: pip install --upgrade google-cloud-bigquery. Look at this GitHub issue, where the absence of these constants in older versions of the library is discussed.
  • dsesto
    dsesto almost 6 years
    I am glad I could help! If the answer was useful for you, please consider accepting and/or upvoting my answer so that the community sees it solved your issue. Thanks!