Issue
I'm trying to use an Apache Beam pipeline to write entities to the Google Cloud Datastore. For testing, I'm doing this in a local Python 2.7 virtual environment that is setup using the Apache Beam instructions. Coding is done in a Jupyter notebook locally. Here's the pseudo code I'm trying:
import apache_beam as beam
#from apache_beam.io.gcp.datastore.v1.datastoreio import WriteToDatastore
from google.cloud import datastore
from google.cloud.datastore.entity import Entity
# projectId will be taken from the environment
storage = datastore.Client()
# The kind for the new entity
gds_entity_kind = 'test_entity'
class PrintFn(beam.DoFn):
def process(self, element):
print (element)
return None
def create_entity(entity_id, name):
key = storage.key(gds_entity_kind, int(entity_id))
entity = Entity(key=key)
entity.update({
'name': name
})
return entity
lines = [
"'0815';'entity A'",
"'4711';'entity B'"
]
with beam.Pipeline() as p:
(p
| 'read lines' >> beam.Create(lines)
| 'rows to columns' >> beam.Map(lambda v: v.split(';'))
| 'remove quotes' >> beam.Map(lambda words: [word.strip('\'') for word in words])
| 'create entities' >> beam.Map(lambda fields: create_entity(*fields))
# | 'write to datastore' >> WriteToDatastore()
| 'debug print' >> beam.ParDo(PrintFn())
)
I found this posting with the similar problem, but somewhat the answer seems unrelated to my situation?! I figure out that the problem seems to be related to the import statement
from google.cloud import datastore
in relation to
from apache_beam.io.gcp.datastore.v1.datastoreio import WriteToDatastore
If I restart the kernel and import the WriteToDataStore only, then I don't get any error. If I try to import both, I get this error. Any help is appreciated!
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-10-45d84b2c60ba> in <module>()
1 import apache_beam as beam
----> 2 from apache_beam.io.gcp.datastore.v1.datastoreio import WriteToDatastore
3 from google.cloud import datastore
4 from google.cloud.datastore.entity import Entity
5
/usr/local/lib/python2.7/site-packages/apache_beam/io/gcp/datastore/v1/datastoreio.py in <module>()
21 import time
22
---> 23 from apache_beam.io.gcp.datastore.v1 import helper
24 from apache_beam.io.gcp.datastore.v1 import query_splitter
25 from apache_beam.io.gcp.datastore.v1 import util
/usr/local/lib/python2.7/site-packages/apache_beam/io/gcp/datastore/v1/helper.py in <module>()
34 # pylint: disable=wrong-import-order, wrong-import-position
35 try:
---> 36 from google.cloud.proto.datastore.v1 import datastore_pb2
37 from google.cloud.proto.datastore.v1 import entity_pb2
38 from google.cloud.proto.datastore.v1 import query_pb2
/usr/local/Cellar/python/2.7.11/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/google/cloud/proto/datastore/v1/datastore_pb2.py in <module>()
15
16 from google.api import annotations_pb2 as google_dot_api_dot_annotations__pb2
---> 17 from google.cloud.proto.datastore.v1 import entity_pb2 as google_dot_cloud_dot_proto_dot_datastore_dot_v1_dot_entity__pb2
18 from google.cloud.proto.datastore.v1 import query_pb2 as google_dot_cloud_dot_proto_dot_datastore_dot_v1_dot_query__pb2
19
/usr/local/Cellar/python/2.7.11/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/google/cloud/proto/datastore/v1/entity_pb2.py in <module>()
26 serialized_pb=_b('\n,google/cloud/proto/datastore/v1/entity.proto\x12\x13google.datastore.v1\x1a\x1cgoogle/api/annotations.proto\x1a\x1cgoogle/protobuf/struct.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x18google/type/latlng.proto\"7\n\x0bPartitionId\x12\x12\n\nproject_id\x18\x02 \x01(\t\x12\x14\n\x0cnamespace_id\x18\x04 \x01(\t\"\xb7\x01\n\x03Key\x12\x36\n\x0cpartition_id\x18\x01 \x01(\x0b\x32 .google.datastore.v1.PartitionId\x12\x32\n\x04path\x18\x02 \x03(\x0b\x32$.google.datastore.v1.Key.PathElement\x1a\x44\n\x0bPathElement\x12\x0c\n\x04kind\x18\x01 \x01(\t\x12\x0c\n\x02id\x18\x02 \x01(\x03H\x00\x12\x0e\n\x04name\x18\x03 \x01(\tH\x00\x42\t\n\x07id_type\"8\n\nArrayValue\x12*\n\x06values\x18\x01 \x03(\x0b\x32\x1a.google.datastore.v1.Value\"\xf1\x03\n\x05Value\x12\x30\n\nnull_value\x18\x0b \x01(\x0e\x32\x1a.google.protobuf.NullValueH\x00\x12\x17\n\rboolean_value\x18\x01 \x01(\x08H\x00\x12\x17\n\rinteger_value\x18\x02 \x01(\x03H\x00\x12\x16\n\x0c\x64ouble_value\x18\x03 \x01(\x01H\x00\x12\x35\n\x0ftimestamp_value\x18\n \x01(\x0b\x32\x1a.google.protobuf.TimestampH\x00\x12-\n\tkey_value\x18\x05 \x01(\x0b\x32\x18.google.datastore.v1.KeyH\x00\x12\x16\n\x0cstring_value\x18\x11 \x01(\tH\x00\x12\x14\n\nblob_value\x18\x12 \x01(\x0cH\x00\x12.\n\x0fgeo_point_value\x18\x08 \x01(\x0b\x32\x13.google.type.LatLngH\x00\x12\x33\n\x0c\x65ntity_value\x18\x06 \x01(\x0b\x32\x1b.google.datastore.v1.EntityH\x00\x12\x36\n\x0b\x61rray_value\x18\t \x01(\x0b\x32\x1f.google.datastore.v1.ArrayValueH\x00\x12\x0f\n\x07meaning\x18\x0e \x01(\x05\x12\x1c\n\x14\x65xclude_from_indexes\x18\x13 \x01(\x08\x42\x0c\n\nvalue_type\"\xbf\x01\n\x06\x45ntity\x12%\n\x03key\x18\x01 \x01(\x0b\x32\x18.google.datastore.v1.Key\x12?\n\nproperties\x18\x03 \x03(\x0b\x32+.google.datastore.v1.Entity.PropertiesEntry\x1aM\n\x0fPropertiesEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12)\n\x05value\x18\x02 \x01(\x0b\x32\x1a.google.datastore.v1.Value:\x02\x38\x01\x42\x82\x01\n\x17\x63om.google.datastore.v1B\x0b\x45ntityProtoP\x01Z<google.golang.org/genproto/googleapis/datastore/v1;datastore\xaa\x02\x19Google.Cloud.Datastore.V1b\x06proto3')
27 ,
---> 28 dependencies=[google_dot_api_dot_annotations__pb2.DESCRIPTOR,google_dot_protobuf_dot_struct__pb2.DESCRIPTOR,google_dot_protobuf_dot_timestamp__pb2.DESCRIPTOR,google_dot_type_dot_latlng__pb2.DESCRIPTOR,])
29 _sym_db.RegisterFileDescriptor(DESCRIPTOR)
30
/usr/local/Cellar/python/2.7.11/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/google/protobuf/descriptor.pyc in __new__(cls, name, package, options, serialized_pb, dependencies, public_dependencies, syntax, pool)
827 # TODO(amauryfa): use the pool passed as argument. This will work only
828 # for C++-implemented DescriptorPools.
--> 829 return _message.default_pool.AddSerializedFile(serialized_pb)
830 else:
831 return super(FileDescriptor, cls).__new__(cls)
TypeError: Couldn't build proto file into descriptor pool!
Invalid proto descriptor for file "google/cloud/proto/datastore/v1/entity.proto":
google.datastore.v1.PartitionId.project_id: "google.datastore.v1.PartitionId.project_id" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.PartitionId.namespace_id: "google.datastore.v1.PartitionId.namespace_id" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.PartitionId: "google.datastore.v1.PartitionId" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Key.partition_id: "google.datastore.v1.Key.partition_id" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Key.path: "google.datastore.v1.Key.path" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Key.PathElement.id_type: "google.datastore.v1.Key.PathElement.id_type" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Key.PathElement.kind: "google.datastore.v1.Key.PathElement.kind" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Key.PathElement.id: "google.datastore.v1.Key.PathElement.id" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Key.PathElement.name: "google.datastore.v1.Key.PathElement.name" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Key.PathElement: "google.datastore.v1.Key.PathElement" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Key: "google.datastore.v1.Key" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.ArrayValue.values: "google.datastore.v1.ArrayValue.values" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.ArrayValue: "google.datastore.v1.ArrayValue" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Value.value_type: "google.datastore.v1.Value.value_type" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Value.null_value: "google.datastore.v1.Value.null_value" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Value.boolean_value: "google.datastore.v1.Value.boolean_value" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Value.integer_value: "google.datastore.v1.Value.integer_value" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Value.double_value: "google.datastore.v1.Value.double_value" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Value.timestamp_value: "google.datastore.v1.Value.timestamp_value" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Value.key_value: "google.datastore.v1.Value.key_value" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Value.string_value: "google.datastore.v1.Value.string_value" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Value.blob_value: "google.datastore.v1.Value.blob_value" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Value.geo_point_value: "google.datastore.v1.Value.geo_point_value" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Value.entity_value: "google.datastore.v1.Value.entity_value" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Value.array_value: "google.datastore.v1.Value.array_value" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Value.meaning: "google.datastore.v1.Value.meaning" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Value.exclude_from_indexes: "google.datastore.v1.Value.exclude_from_indexes" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Value: "google.datastore.v1.Value" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Entity.key: "google.datastore.v1.Entity.key" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Entity.properties: "google.datastore.v1.Entity.properties" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Entity.PropertiesEntry.key: "google.datastore.v1.Entity.PropertiesEntry.key" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Entity.PropertiesEntry.value: "google.datastore.v1.Entity.PropertiesEntry.value" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Entity.PropertiesEntry: "google.datastore.v1.Entity.PropertiesEntry" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Entity: "google.datastore.v1.Entity" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Key.partition_id: "google.datastore.v1.PartitionId" seems to be defined in "google/cloud/datastore_v1/proto/entity.proto", which is not imported by "google/cloud/proto/datastore/v1/entity.proto". To use it here, please add the necessary import.
google.datastore.v1.Key.path: "google.datastore.v1.Key.PathElement" seems to be defined in "google/cloud/datastore_v1/proto/entity.proto", which is not imported by "google/cloud/proto/datastore/v1/entity.proto". To use it here, please add the necessary import.
google.datastore.v1.ArrayValue.values: "google.datastore.v1.Value" seems to be defined in "google/cloud/datastore_v1/proto/entity.proto", which is not imported by "google/cloud/proto/datastore/v1/entity.proto". To use it here, please add the necessary import.
google.datastore.v1.Value.key_value: "google.datastore.v1.Key" seems to be defined in "google/cloud/datastore_v1/proto/entity.proto", which is not imported by "google/cloud/proto/datastore/v1/entity.proto". To use it here, please add the necessary import.
google.datastore.v1.Value.entity_value: "google.datastore.v1.Entity" seems to be defined in "google/cloud/datastore_v1/proto/entity.proto", which is not imported by "google/cloud/proto/datastore/v1/entity.proto". To use it here, please add the necessary import.
google.datastore.v1.Value.array_value: "google.datastore.v1.ArrayValue" seems to be defined in "google/cloud/datastore_v1/proto/entity.proto", which is not imported by "google/cloud/proto/datastore/v1/entity.proto". To use it here, please add the necessary import.
google.datastore.v1.Entity.PropertiesEntry.value: "google.datastore.v1.Value" seems to be defined in "google/cloud/datastore_v1/proto/entity.proto", which is not imported by "google/cloud/proto/datastore/v1/entity.proto". To use it here, please add the necessary import.
google.datastore.v1.Entity.key: "google.datastore.v1.Key" seems to be defined in "google/cloud/datastore_v1/proto/entity.proto", which is not imported by "google/cloud/proto/datastore/v1/entity.proto". To use it here, please add the necessary import.
google.datastore.v1.Entity.properties: "google.datastore.v1.Entity.PropertiesEntry" seems to be defined in "google/cloud/datastore_v1/proto/entity.proto", which is not imported by "google/cloud/proto/datastore/v1/entity.proto". To use it here, please add the necessary import.
Solution
I figured out that the programming concept for using Google Cloud Datastore with Apache Beam (Google Cloud Dataflow) differs from the default Datastore API.
You need to use the Datastore helper as given in this example. With this I was able to changed my code that now successfully runs. Notice the different import for the entity and the different entity creation process.
In summary it creates the entity in the full qualified JSON notation that you will also see when exploring the Datastore in the Google Cloud Console. My original code created the entity in a much more simpler JSON that is usually also understood when writing to the Datastore. Overall I avoided the different dependencies of my original two different imports that caused the error.
import apache_beam as beam
from apache_beam.io.gcp.datastore.v1.datastoreio import WriteToDatastore
from google.cloud.proto.datastore.v1 import entity_pb2
from google.cloud.proto.datastore.v1 import query_pb2
from googledatastore import helper as datastore_helper, PropertyFilter
#from google.cloud import datastore
#from google.cloud.datastore.entity import Entity
def create_entity(_id, name):
entity = entity_pb2.Entity()
kind = "test_entity"
datastore_helper.add_key_path(entity.key, kind, _id)
datastore_helper.add_properties(entity, {
"name": unicode(name)
})
return entity
class PrintFn(beam.DoFn):
def process(self, element):
print (element)
return None
project_id = 'your-gcp-project-id'
lines = [
"'0815';'entity A'",
"'4711';'entity B'"
]
with beam.Pipeline() as p:
(p
| 'read lines' >> beam.Create(lines)
| 'rows to columns' >> beam.Map(lambda v: v.split(';'))
| 'remove quotes' >> beam.Map(lambda words: [word.strip('\'') for word in words])
| 'create entities' >> beam.Map(lambda fields: create_entity(*fields))
| 'write to datastore' >> WriteToDatastore(project_id)
# | 'debug print' >> beam.ParDo(PrintFn())
)
Answered By - Matthias
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.