在我的 Google 云函数(Python 3.7 运行时)中,我创建了一个函数,它试图将所有 .csv 文件从谷歌存储桶下载到 pandas 数据框 (df) 中。进入数据帧后,我打算对其进行一些简单的 ETL 工作,然后将其转换回一个大型 .csv 文件以保存到另一个存储桶。
我遇到的问题是当我将对象(使用 file.download_as_string() 转换为字符串)读入 read_csv() 时,我收到与 IO.StringIO 相关的错误(TypeError:initial_value must be str或无,不是字节)

在 read_csv(io.String.IO(file_contents).... 中,这是否与我放置 io.StringIO 方法的位置有关?谁能帮我改正这个错误?

    def stage1slemonthly(data, context, source_bucket = 'my_source_bucket',  
    destination_bucket = 'gs://my destination_bucket'):   
 
 
        from google.cloud import storage 
        import pandas as pd 
        import pyspark 
        from pyspark.sql import SQLContext 
        import io 
 
        storage_client = storage.Client() 
 
        # source_bucket = data['bucket'] 
        # source_file = data['name'] 
        source_bucket = storage_client.bucket(source_bucket) 
 
        # load in the col names 
        col_names = ["Customer_Country_Number", "Customer_Name", "Exclude", 
             "SAP_Product_Name", "CP_Sku_Code", "Exclude", "UPC_Unit", 
             "UPC_Case", "Colgate_Month_Year", "Total_Cases", 
             "Promoted_Cases", "Non_Promoted_Cases", 
             "Planned_Non_Promoted_Cases", "Exclude", 
             "Lead_Measure", "Tons", "Pieces", "Liters", 
             "Tons_Conv_Revenue", "Volume_POS_Units", "Scan_Volume", 
             "WWhdrl_Volume", "Exclude", "Exclude", "Exclude", "Exclude", 
             "Exclude", "Exclude", "Exclude", "Exclude", "Investment_Buy", 
             "Exclude", "Exclude", "Gross_Sales", "Claim_Sales", 
             "Adjusted_Gross_Sales", "Exclude", "Exclude", 
             "Consumer_Investment", "Consumer_Allowance", 
             "Special_Pack_FG", "Coupons", "Contests_Offers",  
             "Consumer_Price_Reduction", "Permanent_Price_Reduction", 
             "Temporary_Price_Reduction", "TPR_Off_Invoice", "TPR_Scan", 
             "TPR_WWdrwl_Exfact", "Every_Day_Low_Price", "Closeouts", 
             "Inventory_Price_Reduction", "Exclude", "Customer_Investment", 
             "Prompt_Payment", "Efficiency_Drivers", "Efficient_Logistics", 
             "Efficient_Management", "Business_Builders_Direct", "Assortment", 
             "Customer_Promotions","Customer_Promotions_Terms", 
             "Customer_Promotions_Fixed", "Growth_Direct", 
             "New_Product_Incentives", "Free_Goods_Direct", 
             "Shopper_Marketing", "Business_Builders_Indirect", 
             "Middleman_Performance", "Middleman_Infrastructure", 
             "Growth_Indirect", "Indirect_Retailer_Investments", 
             "Free_Goods_Indirect", "Other_Customer_Investments", 
             "Product_Listing_Allowances", "Non_Performance_Trade_Payments", 
             "Exclude", "Variable_Rebate_Adjustment",  
             "Overlapping_OI_Adjustment", "Fixed_Accruals", 
             "Variable_Accruals", "Total_Accruals", "Gross_To_Net", 
             "Invoiced_Sales", "Exclude", "Exclude", "Net_Sales", 
             "Exclude", "Exclude", "Exclude", "Exclude", "Exclude", 
             "Exclude", "Exclude", "Exclude", "Exclude", 
             "Total_Variable_Cost", "Margin", "Exclude"] 
 
        df = pd.DataFrame(columns=[col_names]) 
 
        for file in list(source_bucket.list_blobs()): 
          file_contents = file.download_as_string()  
          df = df.append(pd.read_csv(io.StringIO(file_contents), header=None, names=[col_names])) 
 
        df = df.reset_index(drop=True) 
 
        # do ETL work here in future 
 
        sc = pyspark.SparkContext.getOrCreate() 
        sqlCtx = SQLContext(sc) 
        sparkDf = sqlCtx.createDataFrame(df) 
        sparkDf.coalesce(1).write.option("header", "true").csv(destination_bucket) 
 

当我运行它时,我收到以下错误消息...

Traceback (most recent call last): File "/env/local/lib/python3.7/site-packages/google/cloud/functions/worker.py", line 383, in run_background_function _function_handler.invoke_user_function(event_object) File "/env/local/lib/python3.7/site-packages/google/cloud/functions/worker.py", line 217, in invoke_user_function return call_user_function(request_or_event) File "/env/local/lib/python3.7/site-packages/google/cloud/functions/worker.py", line 214, in call_user_function event_context.Context(**request_or_event.context)) File "/user_code/main.py", line 56, in stage1slemonthly df = df.append(pd.read_csv(io.StringIO(file_contents), header=None, names=[col_names])) TypeError: initial_value must be str or None, not bytes

请您参考如下方法:

您收到此错误是因为 file.download_as_string() return typebytes 而不是 str,并且您不能将 io.StringIObytes 参数一起使用(initial_value =file_contents).

此外,col_names在这里被定义为一个数组,所以写成pd.DataFrame(columns=[col_names])pd.read_csv(... , names=[col_names]) 不正确:您应该使用 col_names 而不是 [col_names]

无论如何,这似乎不是从 Google Cloud Storage 读取 CSV 文件的正确方法。你宁愿写:

from google.cloud import storage 
import pandas as pd 
import io 
 
storage_client = storage.Client() 
 
source_bucket = storage_client.bucket(source_bucket) 
 
col_names = ["Customer_Country_Number", "Customer_Name", ...] 
 
df = pd.DataFrame(columns=col_names) 
 
for file in list(source_bucket.list_blobs()): 
    file_path="gs://{}/{}".format(file.bucket.name, file.name) 
    df = df.append(pd.read_csv(file_path, header=None, names=col_names)) 
 
# the rest of your code 

的确,你可以read files directly from GCSpandasread_csv方法代替下载文件加载,但是需要安装gcsfs (pip3 install gcsfs) 首先。


评论关闭
IT序号网

微信公众号号:IT虾米 (左侧二维码扫一扫)欢迎添加!