Integration
The following is a quick start guide of how to integrate with NanoPing GRPC API. All example code is written in Go.
Connect to the GRPC API Server
Every node can host a GRPC API Server.
To make a NanoPing client host a GRPC API Server, you can use the --grpc
flag when using the up
command.
np up --grpc 127.0.0.1:10432
To make a NanoPing controller host a GRPC API Server, you should add the grpcBridge
section to the NanoPing
controller config file.
node:
name: My Controller
http:
address: 127.0.0.1:8769
hubServer:
authenticationByRequest:
timeout: 5m
grpcBridge:
address: 127.0.0.1:10564
// Connect to the NanoPing Client gRPC server
clientConnection, err := grpc.NewClient("127.0.0.1:10432", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
panic(err)
}
// Connect to the NanoPing Controller gRPC server
controllerConnection, err := grpc.NewClient("127.0.0.1:10564", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
panic(err)
}
Setup node
When a NanoPing Client or Controller is started, the node name is set to the same as the hostname of the
machine it is running on. This can be overridden by setting the node name using the SetNode
method on
the Hub Client Service. The node name is used to identify the node in the control plane. It's also
possible to set metadata on the node, which can be used to identify the node in a more detailed way.
It's important to notice that a controller cannot set its node information via the gRPC API, as it is
set in the YAML controller config file when starting the controller.
// Set the node name and metadata for the client
func setClientNodeInformation(clientConnection *grpc.ClientConn) {
// Create GRPC Service client for the Hub Client
hubClientService := grpcHubClient.NewHubClientServiceClient(clientConnection)
// Set the node name and metadata
_, err := hubClientService.SetNode(context.Background(), &grpcHubClient.SetNodeRequest{
Name: "My NanoPing Client",
Metadata: &grpcHubNode.NodeMetadata{
Items: map[string]*grpcHubNode.NodeMetadataItem{
"type": {Value: &grpcHubNode.NodeMetadataItem_StringValue{StringValue: "my-type"}},
"reference": {Value: &grpcHubNode.NodeMetadataItem_StringValue{StringValue: "my-reference"}},
"location": {Value: &grpcHubNode.NodeMetadataItem_StringValue{StringValue: "Denmark, Aalborg"}},
"random_zero_bytes": {Value: &grpcHubNode.NodeMetadataItem_BytesValue{BytesValue: make([]byte, 10)}},
},
},
})
if err != nil {
panic(fmt.Sprintf("Unsuccessful SetNode: %s", err))
}
}
Authenticate node to the control plane
For a node to authenticate to the control plane, the node has to use one of the authentication methods
supported by the Hub Client. An example of this could be using the Authentication By Request
. This
will create an authentication request on the Hub Server that can be accepted or denied. Whenever the
request has been accepted, the node will be authenticated and can start communicating with the other
nodes in the cluster. The authentication is persisted in the node's application data directory, so the
node automatically connects to the control plane when the node is started.
// Authenticate the client to the Hub Server
func authenticateClient(clientConnection *grpc.ClientConn, controllerHttpAddress string) {
// Create GRPC Service client for the Hub Client
hubClientService := grpcHubClient.NewHubClientServiceClient(clientConnection)
// Authenticate the Hub Client to the Hub Server. This will block until the request has been accepted, rejected, or times out.
_, err := hubClientService.AuthenticateByRequest(context.Background(), &grpcHubClient.AuthenticateByRequestRequest{
HubServerAddress: controllerHttpAddress,
})
// Ignore the error if the client is already authenticated
if err != nil && err.Error() != "rpc error: code = AlreadyExists desc = Client is already authenticated" {
panic(fmt.Sprintf("Unsuccessful authenticated: %s", err))
}
fmt.Printf("Authenticated node with the Hub Server\n\n")
}
Accepting authentication requests
When a node is trying to authenticate to the control plane, the request has to be accepted or denied. The following code streams a single request from the Hub Server and accepts it.
func acceptAuthenticationRequests(controllerConnection *grpc.ClientConn) {
// Create GRPC Service client for the Hub Server
hubServerService := grpcHubServer.NewHubServerServiceClient(controllerConnection)
// Listen for an authentication requests and accept it
stream, err := hubServerService.AuthenticationByRequests(context.Background())
if err != nil {
panic(fmt.Sprintf("Unsuccessful AuthenticationByRequests: %s", err))
}
// Read the next request
request, err := stream.Recv()
if err != nil {
panic(fmt.Sprintf("Unsuccessful Recv: %s", err))
}
// Accept the request
err = stream.Send(&grpcHubServer.AuthenticationByRequestAnswer{
RequestId: request.RequestId,
Accept: true,
})
}
If you wanted to continuously accept all requests, place the read and send in a loop.
for {
// Read the next request
request, err := stream.Recv()
if err != nil {
panic(fmt.Sprintf("Unsuccessful Recv: %s", err))
}
// Accept the request
err = stream.Send(&grpcHubServer.AuthenticationByRequestAnswer{
RequestId: request.RequestId,
Accept: true,
})
}
Observe connection state of the hub client
The connection state of the hub client can be observed by listening to the connection state stream.
func observeHubClientConnectionState(clientConnection *grpc.ClientConn) {
// Create GRPC Service client for the Hub Client
hubClientService := grpcHubClient.NewHubClientServiceClient(clientConnection)
// Observe the connection state of the Hub Client
connectionStateStream, err := hubClientService.StreamConnectionState(context.Background(), &grpcHubClient.StreamConnectionStateRequest{})
if err != nil {
panic(fmt.Sprintf("Unsuccessful StreamConnectionState: %s", err))
}
go func() {
for {
connectionStateStream, err := connectionStateStream.Recv()
if err != nil {
return
}
fmt.Printf("Connection state: %s\n\n", connectionStateStream.State)
}
}()
}
Fetch all nodes connected to the hub
The Hub Server is accessible from every node, and can be used to query and fetch information about the nodes connected to the control plane.
// Print all nodes connected to the hub and print their names
func printNamesOfAllNodes(clientConnection *grpc.ClientConn) {
// Create GRPC Service client for the Hub Server
hubServerService := grpcHubServer.NewHubServerServiceClient(clientConnection)
// Fetch all nodes connected to the hub
nodes, err := hubServerService.GetNodes(context.Background(), &grpcHubServer.GetNodesRequest{})
if err != nil {
panic(fmt.Sprintf("Unsuccessful GetNodes: %s", err))
}
fmt.Println("All nodes:")
for _, node := range nodes.Nodes {
fmt.Printf(" Node: %s\n", node.Name)
}
fmt.Println()
}
Find all nodes with metadata "type" set to "my-type"
You can use meta filters to filter the nodes connected to the hub. This can be useful if you want to find all nodes with a specific metadata set.
// Print all nodes connected to the hub with metadata "type" set to "my-type" and print their names
func printNamesOfAllNodesWithMatchingMetadata(clientConnection *grpc.ClientConn) {
// Create GRPC Service client for the Hub Server
hubServerService := grpcHubServer.NewHubServerServiceClient(clientConnection)
// Fetch all nodes connected to the hub with metadata "type" set to "my-type"
nodes, err := hubServerService.GetNodes(context.Background(), &grpcHubServer.GetNodesRequest{
MetadataFilters: map[string]*grpcHubNode.NodeMetadataItem{
"type": {Value: &grpcHubNode.NodeMetadataItem_StringValue{StringValue: "my-type"}},
},
})
if err != nil {
panic(fmt.Sprintf("Unsuccessful GetNodes: %s", err))
}
fmt.Println("Nodes with metadata type=my-type:")
for _, node := range nodes.Nodes {
fmt.Printf(" Node: %s\n", node.Name)
}
fmt.Println()
}
Find all nodes with the pipelines service enabled
All pipelines are accessible from every node in the cluster. The control plane will take care of routing the request to the correct node. But first you must know which node has the pipelines service running. For this you can use the service filters.
// Gets all nodes connected to the hub with "pipelines" service running
func getNodesWithPipelineService(clientConnection *grpc.ClientConn) (pipelineNodes []*grpcHubNode.Node) {
// Create GRPC Service client for the Hub Server
hubServerService := grpcHubServer.NewHubServerServiceClient(clientConnection)
// Fetch all nodes connected to the hub with "pipelines" service running
pipelinesNodes, err := hubServerService.GetNodes(context.Background(), &grpcHubServer.GetNodesRequest{
ServiceFilters: []grpcHubNode.NodeService{
grpcHubNode.NodeService_PIPELINES,
},
})
if err != nil {
panic(fmt.Sprintf("Unsuccessful GetNodes: %s", err))
}
// Print the names of all nodes that has the pipelines service enabled
fmt.Println("Nodes with Pipelines service enabled:")
for _, node := range pipelinesNodes.Nodes {
fmt.Printf(" Node: %s\n", node.Name)
pipelineNodes = append(pipelineNodes, node)
}
fmt.Println()
return pipelineNodes
}
Do pipeline CRUD operations
You can create, read, update, and delete pipelines on any node in the cluster by using the node ID.
// For every pipeline node we will create a pipeline, update the pipeline, get the pipeline, and delete the pipeline.
func doPipelineCRUDOperations(clientConnection *grpc.ClientConn, pipelineNodes []*grpcHubNode.Node) {
// Create pipelines service client
pipelinesService := grpcPipelines.NewPipelinesServiceClient(clientConnection)
for _, node := range pipelineNodes {
fmt.Printf("Doing CRUD pipeline operations on node %s\n", node.Name)
// Create a pipeline
createResponse, err := pipelinesService.CreatePipeline(context.Background(), &grpcPipelines.CreatePipelineRequest{
NodeId: node.Id,
JsonConfig: test_json_config,
Name: "My First Pipeline",
OptionalDefaultLoggingLevel: &grpcPipelines.CreatePipelineRequest_DefaultLoggingLevel{
DefaultLoggingLevel: grpcLogging.Level_DEBUG,
},
RestartPolicy: &grpcPipelines.RestartPolicy{
RestartPolicy: &grpcPipelines.RestartPolicy_OnFailure{
OnFailure: &grpcPipelines.RestartPolicyOnFailure{
MaxRestarts: 3,
},
},
},
OptionalInstructionsTimeout: &grpcPipelines.CreatePipelineRequest_InstructionsTimeout{
InstructionsTimeout: int32(15),
},
})
if err != nil {
panic(fmt.Sprintf("Unsuccessful CreatePipeline: %s", err))
}
fmt.Printf(" Created pipeline %s with id %s\n", createResponse.Pipeline.Name, createResponse.Pipeline.Id)
// Update the pipeline by changing the name but keeping the rest of the configuration
updatedResponse, err := pipelinesService.UpdatePipeline(context.Background(), &grpcPipelines.UpdatePipelineRequest{
NodeId: node.Id,
Id: createResponse.Pipeline.Id,
JsonConfig: createResponse.Pipeline.JsonConfig,
Name: "My Updated Pipeline",
RestartPolicy: createResponse.Pipeline.RestartPolicy,
DefaultLoggingLevel: createResponse.Pipeline.DefaultLoggingLevel,
InstructionsTimeout: createResponse.Pipeline.InstructionsTimeout,
})
if err != nil {
panic(fmt.Sprintf("Unsuccessful UpdatePipeline: %s", err))
}
fmt.Printf(" Updated pipeline to name %s for id %s\n", updatedResponse.Pipeline.Name, updatedResponse.Pipeline.Id)
// Get the pipeline by id
getByIdResponse, err := pipelinesService.GetPipelineById(context.Background(), &grpcPipelines.GetPipelineByIdRequest{
NodeId: node.Id,
Id: updatedResponse.Pipeline.Id,
})
if err != nil {
panic(fmt.Sprintf("Unsuccessful GetPipelineById: %s", err))
}
fmt.Printf(" Got pipeline by id %s with name %s\n", getByIdResponse.Pipeline.Id, getByIdResponse.Pipeline.Name)
// Delete the pipeline
_, err = pipelinesService.DeletePipeline(context.Background(), &grpcPipelines.DeletePipelineRequest{
NodeId: node.Id,
Id: getByIdResponse.Pipeline.Id,
})
if err != nil {
panic(fmt.Sprintf("Unsuccessful DeletePipeline: %s", err))
}
fmt.Printf(" Deleted pipeline with id %s\n\n", getByIdResponse.Pipeline.Id)
}
}
Listen for pipeline events
You can listen for pipeline events by using the StreamPipelineEvents
method on the Pipelines. Events are sent every time a pipeline is created, updated, or deleted. The events are sent as a stream of messages, so you can listen for them in a loop.
func observePipelineEvents(clientConnection *grpc.ClientConn, nodeId string) {
fmt.Printf("Observing pipeline events for node %s\n", nodeId)
// Create GRPC Service client for the Hub Client
pipelinesService := grpcPipelines.NewPipelinesServiceClient(clientConnection)
// Observe pipeline events
stream, err := pipelinesService.StreamPipelines(context.Background(), &grpcPipelines.StreamPipelinesRequest{
NodeId: nodeId,
})
if err != nil {
panic(fmt.Sprintf("Unsuccessful StreamPipelineEvents: %s", err))
}
// Create goroutine to listen for events
go func() {
for {
event, err := stream.Recv()
if err != nil {
return
}
switch e := event.Event.(type) {
case *grpcPipelines.StreamPipelinesEvent_Create:
fmt.Printf("Received create event: %s\n", e.Create.Pipeline.Id)
case *grpcPipelines.StreamPipelinesEvent_Update:
fmt.Printf("Received update event: %s\n", e.Update.Pipeline.Id)
case *grpcPipelines.StreamPipelinesEvent_Delete:
fmt.Printf("Received delete event: %s\n", e.Delete.Pipeline.Id)
default:
panic(fmt.Sprintf("Unknown event type: %T", e))
}
}
}()
}
Putting it all together
If you put all the above code together, you will have a complete example of how to connect to the GRPC API.
The complete code can be found at examples/go/main.go
. Running the code should output something like the following:
Connection state: UNAUTHENTICATED
Updated client 4d3c0462-cf8d-4794-8ed3-aedbc712891a with node information
Connection state: CONNECTED
Authenticated with the Hub Server
All nodes:
Node: My Controller
Node: My NanoPing Client
Nodes with metadata type=my-type:
Node: My NanoPing Client
Nodes with Pipelines service enabled:
Node: My Controller
Node: My NanoPing Client
Doing CRUD pipeline operations on node My Controller
Created pipeline My First Pipeline with id 656c1d5f-3b9d-4d08-998a-2eb22d54ac39
Updated pipeline to name My Updated Pipeline for id 656c1d5f-3b9d-4d08-998a-2eb22d54ac39
Got pipeline by id 656c1d5f-3b9d-4d08-998a-2eb22d54ac39 with name My Updated Pipeline
Deleted pipeline with id 656c1d5f-3b9d-4d08-998a-2eb22d54ac39
Doing CRUD pipeline operations on node My NanoPing Client
Created pipeline My First Pipeline with id ca3a11ca-255a-4f86-a64e-66026c4949fd
Updated pipeline to name My Updated Pipeline for id ca3a11ca-255a-4f86-a64e-66026c4949fd
Got pipeline by id ca3a11ca-255a-4f86-a64e-66026c4949fd with name My Updated Pipeline
Deleted pipeline with id ca3a11ca-255a-4f86-a64e-66026c4949fd